Monitor ETL process start and stop status in vert.x event bus deployment

40 views
Skip to first unread message

SupRemo

unread,
Jul 16, 2018, 2:17:15 PM7/16/18
to vert.x

I am complete noobie in Vert.x and Kotlin. I have made an ETL pipeline to fetch the data from source API for my academic project which will run at specific given time everyday. It's not fixed that how much time will it take to fetch the data and do some stuffs over it, and store it in database. Is there any way to make a log on the console or in a file regarding the status of the ETL process? I have one endpoint too just to show the status of the ETL process so if possible I can use that too!

I am not sure how to process ahead. I am attaching my some of the code snippets below which will help.

The program flow is: Combinations -> API Requester -> Persister

class Combinator(kodein: Kodein): AbstractVerticle() {

private val logger by lazy { LoggerFactory.getLogger(this::class.simpleName) }

override fun start() {
    this.publishCombinations()
    vertx.undeploy(this.deploymentID())
}

private fun publishCombinations() {
    logger.info("Generating requests")

    val productRateCombinations = DefinedEnuM.values().flatMap(this::expandTiers)
    productRateCombinations.parallelStream().forEach {
        vertx.eventBus().send(
                "etl.request",
                JsonObject(Json.encode(it))
        )
    }

    logger.info("Publishing complete!")
}
private fun expandTiers(Some Input Params): List<Product> {
    //Several combinations of those params maker in here...
}

Hence, this send all the combinations to Requester... Requester is just assigning all the values to HTTP paramaters and making API call...I am getting JSON Data in response. I am modifying and extracting according to my needs and making different objects according to my database schema and then sending those object to Persister to save them. I am using the same method as above for sending from Requester to Persister usingvertx.eventBus().send("etl.persist",JsonObject(Json.encode(obj)))

In the persister, I am taking all the JsonObject passed as a parameter as a message string and updating/saving it.

class Persister(kodein: Kodein): AbstractExpiringVerticle(
    TimeUnit.MINUTES.toSeconds(5)) {
private val logger by lazy { LoggerFactory.getLogger(this::class.simpleName) }
private val ebeanServer = kodein.instance<EbeanServer>("project-db")

override fun start() {
    vertx.eventBus().consumer<JsonObject>("etl.persist", this::persistFrame)
    super.start()
}

private fun persistFrame(message: Message<JsonObject>) {
    //saving/updating it in DB... working perfectly....
}

I am defining the characteristics of this ETL process using cron scheduling..

"pipelines:"[
 {
  "name": "ETL",
  "cron": "0 0/1 * 1/1 * ? *",
  "verticles": [
    {
      "name": "etl.Persister",
      "deploymentOptions": {
        "instances": 5,
        "worker": true
      }
    }, //same for Requester and Combinator...

One class of Pipeline Handling looks after setting them up during the booting up...

class PipelineManager(val kodein: Kodein): AbstractVerticle() {
companion object {
  const val deployPipelineAddress: String = "etl.deploy_pipeline"
}

private val logger by lazy { 
 LoggerFactory.getLogger(this::class.simpleName) }

private val pipelines by lazy {
 config().getJsonArray("pipelines").list as List<JsonObject>
}

/** * On boot, schedule all the tasks using the cron strings * in the pipeline definitions. */

override fun start() {

   vertx.eventBus().consumer<JsonArray>(
   deployPipelineAddress,
    this::deployPipeline //The function definition deploys all the verticles and sets them up...
   )

vertx.sharedData().getLock(
  "etl.cron_deploy",
  this::configurePipelines //The function definition configures all the verticles...
)
 }

As this ETL process is getting invoked automatically by its own, is there a way to monitor it's starting and completing Log Information to keep track whether it is/it was running properly or not. To keep track of its activity. Manually keeping track is so much pain because I have to constantly look after it.

Any guidance will be appreciated. Thank you. Really need help. Thanking you all in advance.

Reply all
Reply to author
Forward
0 new messages