Hey Guys,
I have been able to execute the Jobs using the Rest Endpoint with the standalone deployment by creating custom service
**************************************************************************************
package com.bph;
import java.net.URI;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.linkedin.r2.filter.compression.EncodingType;
import com.linkedin.r2.filter.compression.ServerCompressionFilter;
import com.linkedin.r2.filter.FilterChain;
import com.linkedin.r2.filter.FilterChains;
import com.linkedin.r2.transport.common.bridge.server.TransportDispatcher;
import com.linkedin.r2.transport.http.server.HttpNettyServerFactory;
import com.linkedin.r2.transport.http.server.HttpServer;
import com.linkedin.restli.docgen.DefaultDocumentationRequestHandler;
import com.linkedin.restli.server.DelegatingTransportDispatcher;
import com.linkedin.restli.server.RestLiConfig;
import com.linkedin.restli.server.RestLiServer;
import com.linkedin.restli.server.mock.InjectMockResourceFactory;
import com.linkedin.restli.server.mock.SimpleBeanProvider;
import com.linkedin.restli.server.resources.ResourceFactory;
import gobblin.metastore.JobHistoryStore;
import gobblin.metastore.MetaStoreModule;
public class JobLauncherServer extends AbstractIdleService {
private static final Logger LOGGER = LoggerFactory.getLogger(JobLauncherServer.class);
private final URI serverUri;
//private final URI serverAdvertisedUri;
private final int port;
private final Properties properties;
private volatile Optional<HttpServer> httpServer;
public JobLauncherServer(Properties properties) {
this.properties = properties;
port = getPort(properties);
serverUri = getServiceUri(getHost(properties), port);
//serverAdvertisedUri = getAdvertisedUri(properties);
}
@Override
protected void startUp()
throws Exception {
// Server configuration
RestLiConfig config = new RestLiConfig();
config.addResourcePackageNames(JobLauncherResource.class.getPackage().getName());
config.setServerNodeUri(serverUri);
config.setDocumentationRequestHandler(new DefaultDocumentationRequestHandler());
// Handle dependency injection
//Injector injector = Guice.createInjector(new MetaStoreModule(properties));
//JobHistoryStore jobHistoryStore = injector.getInstance(JobHistoryStore.class);
SimpleBeanProvider beanProvider = new SimpleBeanProvider();
//beanProvider.add("jobHistoryStore", jobHistoryStore);
// Use InjectMockResourceFactory to keep this Spring free
ResourceFactory factory = new InjectMockResourceFactory(beanProvider);
// Create and start the HTTP server
TransportDispatcher dispatcher = new DelegatingTransportDispatcher(new RestLiServer(config));
FilterChain filterChain = FilterChains.create(new ServerCompressionFilter(new EncodingType[] {
EncodingType.SNAPPY,
EncodingType.GZIP
}));
this.httpServer = Optional.of(new HttpNettyServerFactory(filterChain).createServer(port, dispatcher));
LOGGER.info("Starting the job launcher server");
LOGGER.info("Properties "+properties);
this.httpServer.get().start();
}
@Override
protected void shutDown()
throws Exception {
if (this.httpServer.isPresent()) {
LOGGER.info("Stopping the job launcher information server");
this.httpServer.get().stop();
}
}
private static URI getServiceUri(String host, int port) {
return URI.create(String.format("http://%s:%d", host, port));
}
private static int getPort(Properties properties) {
return Integer.parseInt(properties.getProperty(
Configurations.OLYFEJOBINVOKE_REST_SERVER_PORT_KEY,
Configurations.DEFAULT_OLYFEJOBINVOKE_REST_SERVER_PORT));
}
private static String getHost(Properties properties) {
return properties.getProperty(
Configurations.OLYFEJOBINVOKE_REST_SERVER_HOST_KEY,
Configurations.DEFAULT_OLYFEJOBINVOKE_REST_SERVER_HOST);
}
}
**************************************************************************************
This is configured in the gooblin-standalone.properties as
app.additional.services=com.bph.JobLauncherServer
The correponding code for launching the jobs in JobLauncherResource is as
JobLauncher jobLauncher = closer.register(JobLauncherFactory.newJobLauncher(sysProp,jobProp));
jobLauncher.launchJob(null);
This implementation was done long back when gooblin service did not exist which also seems to be using the similar approach in creating the Rest endpoint, the Rest endpoint in done via gobblin.restli.EmbeddedRestliServer in gobblin service.
I tried standalone/mr deployment and it worked as explained here
Our requirement is to use the mr with the rest endpoint which does not exists, I am thinking of
1) Researching a way to trigger the yarn deployment via the the existing JobLauncherResource, not sure if that can be done.
2) Implementing the Rest endpoint in a similar way as it was done in standalone deployment
I know gooblinservice might help but it is tied to the AWS infrastucture as shown here
I have also scanned the code to confirm so I don't want to use the gooblin service as of now as it would add AWS piece.
I am thinking of different option, can we have a pluggable REST service which can be plugged into all the modules ie Standalone/MR/Yarn/AWS/GooblinService.
Currently only GobblinService(service) module have rest endpoint.
Some of the older discussion that could give more background information follows
Real Time Ingestion using YARN.
Rest API for triggering Jobs.
>>> We are also working on some improvements on how jobs are launched which
>>> will address directly your use case (uploading Jobs through a REST API) and
>>> add support for other use cases such as creating/triggering jobs from Kafka
>>> events. We hope to introduce those enhancements in our next release - 0.9.0
Thanks,
Vicky