Unified RestEndpoint for all configurations.

43 views
Skip to first unread message

Vicky Kak

unread,
Apr 21, 2017, 2:13:02 AM4/21/17
to gobblin-users
Hey Guys,

I have been able to execute the Jobs using the Rest Endpoint with the standalone deployment by creating custom service

**************************************************************************************
package com.bph;

import java.net.URI;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Optional;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Guice;
import com.google.inject.Injector;

import com.linkedin.r2.filter.compression.EncodingType;
import com.linkedin.r2.filter.compression.ServerCompressionFilter;
import com.linkedin.r2.filter.FilterChain;
import com.linkedin.r2.filter.FilterChains;
import com.linkedin.r2.transport.common.bridge.server.TransportDispatcher;
import com.linkedin.r2.transport.http.server.HttpNettyServerFactory;
import com.linkedin.r2.transport.http.server.HttpServer;
import com.linkedin.restli.docgen.DefaultDocumentationRequestHandler;
import com.linkedin.restli.server.DelegatingTransportDispatcher;
import com.linkedin.restli.server.RestLiConfig;
import com.linkedin.restli.server.RestLiServer;
import com.linkedin.restli.server.mock.InjectMockResourceFactory;
import com.linkedin.restli.server.mock.SimpleBeanProvider;
import com.linkedin.restli.server.resources.ResourceFactory;

import gobblin.metastore.JobHistoryStore;
import gobblin.metastore.MetaStoreModule;

public class JobLauncherServer extends AbstractIdleService {

  private static final Logger LOGGER = LoggerFactory.getLogger(JobLauncherServer.class);

  private final URI serverUri;
  //private final URI serverAdvertisedUri;
  private final int port;
  private final Properties properties;
  private volatile Optional<HttpServer> httpServer;

  public JobLauncherServer(Properties properties) {
    this.properties = properties;
    port = getPort(properties);
    serverUri = getServiceUri(getHost(properties), port);
    //serverAdvertisedUri = getAdvertisedUri(properties);
  }

  @Override
  protected void startUp()
      throws Exception {
    // Server configuration
    RestLiConfig config = new RestLiConfig();
    config.addResourcePackageNames(JobLauncherResource.class.getPackage().getName());
    config.setServerNodeUri(serverUri);
    config.setDocumentationRequestHandler(new DefaultDocumentationRequestHandler());

    // Handle dependency injection
    //Injector injector = Guice.createInjector(new MetaStoreModule(properties));
    //JobHistoryStore jobHistoryStore = injector.getInstance(JobHistoryStore.class);
    SimpleBeanProvider beanProvider = new SimpleBeanProvider();
    //beanProvider.add("jobHistoryStore", jobHistoryStore);
    // Use InjectMockResourceFactory to keep this Spring free
    ResourceFactory factory = new InjectMockResourceFactory(beanProvider);

    // Create and start the HTTP server
    TransportDispatcher dispatcher = new DelegatingTransportDispatcher(new RestLiServer(config));
    FilterChain filterChain = FilterChains.create(new ServerCompressionFilter(new EncodingType[] {
            EncodingType.SNAPPY,
            EncodingType.GZIP
    }));
    this.httpServer = Optional.of(new HttpNettyServerFactory(filterChain).createServer(port, dispatcher));
    LOGGER.info("Starting the job launcher server");
    LOGGER.info("Properties "+properties);
    this.httpServer.get().start();
  }

  @Override
  protected void shutDown()
      throws Exception {
    if (this.httpServer.isPresent()) {
      LOGGER.info("Stopping the job launcher information server");
      this.httpServer.get().stop();
    }
  }

  private static URI getServiceUri(String host, int port) {
    return URI.create(String.format("http://%s:%d", host, port));
  }

  private static int getPort(Properties properties) {
    return Integer.parseInt(properties.getProperty(
            Configurations.OLYFEJOBINVOKE_REST_SERVER_PORT_KEY,
            Configurations.DEFAULT_OLYFEJOBINVOKE_REST_SERVER_PORT));
  }

  private static String getHost(Properties properties) {
    return properties.getProperty(
    Configurations.OLYFEJOBINVOKE_REST_SERVER_HOST_KEY,
    Configurations.DEFAULT_OLYFEJOBINVOKE_REST_SERVER_HOST);
  }
  
}

**************************************************************************************

This is configured in the gooblin-standalone.properties as 
app.additional.services=com.bph.JobLauncherServer

The correponding code for launching the jobs in JobLauncherResource is as 

JobLauncher jobLauncher = closer.register(JobLauncherFactory.newJobLauncher(sysProp,jobProp));
jobLauncher.launchJob(null);


This implementation was done long back when gooblin service did not exist which also seems to be using the similar approach in creating the Rest endpoint, the Rest endpoint in done via gobblin.restli.EmbeddedRestliServer in gobblin service.

I tried standalone/mr deployment and it worked as explained here

Our requirement is to use the mr with the rest endpoint which does not exists, I am thinking of 
1) Researching a way to trigger the yarn deployment via the the existing JobLauncherResource, not sure if that can be done.
2) Implementing the Rest endpoint in a similar way as it was done in standalone deployment

I know gooblinservice might help but it is tied to the AWS infrastucture as shown here 
I have also scanned the code to confirm so I don't want to use the gooblin service as of now as it would add AWS piece.

I am thinking of different option, can we have a pluggable REST service which can be plugged into all the modules ie Standalone/MR/Yarn/AWS/GooblinService.
Currently only GobblinService(service) module have rest endpoint.

Some of the older discussion that could give more background information follows

Real Time Ingestion using YARN.

Rest API for triggering Jobs.
>>> We are also working on some improvements on how jobs are launched which 
>>> will address directly your use case (uploading Jobs through a REST API) and 
>>> add support for other use cases such as creating/triggering jobs from Kafka 
>>> events. We hope to introduce those enhancements in our next release - 0.9.0


Thanks,
Vicky

Vicky Kak

unread,
Apr 21, 2017, 2:15:01 AM4/21/17
to gobblin-users


On Friday, April 21, 2017 at 11:43:02 AM UTC+5:30, Vicky Kak wrote:

Rest API for triggering Jobs.
>>> We are also working on some improvements on how jobs are launched which 
>>> will address directly your use case (uploading Jobs through a REST API) and 
>>> add support for other use cases such as creating/triggering jobs from Kafka 
>>> events. We hope to introduce those enhancements in our next release - 0.9.0


Is this done?

Thanks,
Vicky 
Reply all
Reply to author
Forward
0 new messages