Request Response Pattern while using message broker kafka

3,291 views
Skip to first unread message

Can çobanoğlu

unread,
Dec 17, 2017, 7:24:46 AM12/17/17
to Lagom Framework Users
Hi all,

I have a case that a user makes a request via REST endpoint from one of my lagom services and this request will be sent to an external Flink Server to be processed via a kafka topic, after that if this operation is done or failed an information about this should be returned to the caller service. In this case a kind of "correlation info" has to be caried that my service (waits for callback) can poll the information that is sent. so, the user that made a request can get the response.

How can i achieve this ? Could you propose me a solution ? 

Thanks. 

Tim Moore

unread,
Dec 17, 2017, 7:53:18 PM12/17/17
to Can çobanoğlu, Lagom Framework Users
Hi,

This is a pretty general question... I think there are a lot of ways you could do this. Is there something specific you are having trouble with, or do you not know where to even start?

The first question that I would ask is whether the originating client needs to stop and wait for a response, or whether it can have a callback (maybe using a WebHook) to get the response asynchronously. What are all of the components involved?

For the correlation ID I would generate a UUID early in the process and attach that to all of the messages that flow through the system.

Best,
Tim

--
You received this message because you are subscribed to the Google Groups "Lagom Framework Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to lagom-framework+unsubscribe@googlegroups.com.
To post to this group, send email to lagom-framework@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/lagom-framework/0b886e56-e6af-4d3d-9849-d360d6626a91%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Tim Moore
Senior Engineer, Lagom, Lightbend, Inc.

Can çobanoğlu

unread,
Dec 18, 2017, 4:00:56 AM12/18/17
to Lagom Framework Users
Tim, thanks,

Yes i know it's a general question but, in Kafka because of it's nature, we don't know who producers and consumers are. Let's say if we have a system that it is possible to wait for a specific message with a correlation ID that i've generated before as a response, that's ok. But in Kafka, how could i do this ? does "subscribing a specific topic for responses" resolve the issue ?  But in this time, i would consume all message coming in that topic also which i think that it wouldn't be the correct way ? 

Actually it's a simple CREATE request that user sees in that way. But in background, things not working as it's seen. User's request should be processed in more than one unit like Flink Server. Am i clear enough ? :)

The first question that I would ask is whether the originating client needs to stop and wait for a response, or whether it can have a callback (maybe using a WebHook) to get the response asynchronously. What are all of the components involved?
User "A" Request via REST endpoint (Lagom) --> An event "X" is emitted to Kafka --> Flink server gets "X" and do some process on it and complete a job then create an Event "Y" emit it to Kafka again -->  ?? User "A" should be warned about emitted event "Y"


  

18 Aralık 2017 Pazartesi 03:53:18 UTC+3 tarihinde Tim Moore yazdı:
Hi,

This is a pretty general question... I think there are a lot of ways you could do this. Is there something specific you are having trouble with, or do you not know where to even start?

The first question that I would ask is whether the originating client needs to stop and wait for a response, or whether it can have a callback (maybe using a WebHook) to get the response asynchronously. What are all of the components involved?

For the correlation ID I would generate a UUID early in the process and attach that to all of the messages that flow through the system.

Best,
Tim
On Sun, Dec 17, 2017 at 10:54 PM, Can çobanoğlu <cancob...@gmail.com> wrote:
Hi all,

I have a case that a user makes a request via REST endpoint from one of my lagom services and this request will be sent to an external Flink Server to be processed via a kafka topic, after that if this operation is done or failed an information about this should be returned to the caller service. In this case a kind of "correlation info" has to be caried that my service (waits for callback) can poll the information that is sent. so, the user that made a request can get the response.

How can i achieve this ? Could you propose me a solution ? 

Thanks. 

--
You received this message because you are subscribed to the Google Groups "Lagom Framework Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to lagom-framewo...@googlegroups.com.
To post to this group, send email to lagom-f...@googlegroups.com.

Tim Moore

unread,
Dec 18, 2017, 10:33:05 PM12/18/17
to Can çobanoğlu, Lagom Framework Users
Yes, your Kafka consumer will be a different process than the producer. One option would be to use Lagom's cluster pub/sub to notify the waiting service.

So your service would:
  1. Create the correlation UUID
  2. Create a Future (Scala) or CompletableFuture (Java) representing the result
  3. Register a subscriber on the cluster pub/sub topic, filter the messages to find the first one with the correlation UUID, and then use it to complete the future created above
  4. Publish the message to Kafka (this is done after registering the subscriber to ensure there is no race condition)
Your Kafka consumer would then broadcast each message received from Kafka to the internal cluster pub/sub topic.

Would that work for you? Is my description clear?

Best,
Tim

To unsubscribe from this group and stop receiving emails from it, send an email to lagom-framework+unsubscribe@googlegroups.com.
To post to this group, send email to lagom-framework@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/lagom-framework/708ceb45-658e-49ae-b3dc-5f2e552632a1%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Can çobanoğlu

unread,
Dec 19, 2017, 12:58:11 AM12/19/17
to Lagom Framework Users

I think the 5. step is waiting for a message from my external service message that was published to Kafka and is the 3. step for this ? I mean, waiting a message from Kafka to filter and send to the Future. 

So you mean that i should connect Kafka source to my pub/sub source to get a message as a response and resolve by correlation UUID, then publish it to the subscriber that already subscribed to that source with correlation UUID created before. ?




19 Aralık 2017 Salı 06:33:05 UTC+3 tarihinde Tim Moore yazdı:

Tim Moore

unread,
Dec 19, 2017, 1:03:57 AM12/19/17
to Can çobanoğlu, Lagom Framework Users
Yes, that's what I would try.

To unsubscribe from this group and stop receiving emails from it, send an email to lagom-framework+unsubscribe@googlegroups.com.
To post to this group, send email to lagom-framework@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/lagom-framework/63e20786-ae31-4f9d-a81a-a9763045a543%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Can çobanoğlu

unread,
Jan 2, 2018, 12:21:58 PM1/2/18
to Lagom Framework Users
Tim hi again, 

I need to ask you something about my code. In your previous answer, you gave me some ideas on how to design a way to get a specific response from pubsub registry.  In my code, i'm trying to do some POC and i'm not sure this is correct way. (if it was returning Source thats ok. !) how to stop the stream and listen for a specific correlation ID. As you can see, request and response are both running REST way.

@Override
public ServiceCall<String, String> doRequestAndWaitResponse(String someId) {
// create a message with a correlation Id (UUID) and send it to kafka
// and wait for a response with this id using pubSub
return (String req) -> {
// someId is my so called correlation ID
PubSubRef<String> topic = pubSubRegistry.refFor(TopicId.of(String.class, someId));

// omitted - publish message to kafka

return CompletableFuture.runAsync(() -> {
topic.subscriber().map(i -> {
if (i == someId)
return someId;
return "return what ?";
});
}).thenApply(str -> "New response " + str);
};
} 

Thanks in advance

19 Aralık 2017 Salı 09:03:57 UTC+3 tarihinde Tim Moore yazdı:

Can çobanoğlu

unread,
Jan 3, 2018, 7:46:57 AM1/3/18
to Lagom Framework Users
I think, we resolved this by doing something like below. But we would like to know that is this the best practice of using pubSub in Lagom ? 

There are 2 service call endpoints. doRequestAndWaitResponse is for client to make a request and wait for a response. Second, publish  service call is for testing purpose. Instead of doing this, we should replace it to handle incoming kafka events. 

But a question arises: What if something happens while waiting for a response message from outside ? Is this a correct way of doing this kind of case ? 

@Override
public ServiceCall<String, NotUsed> publish(String someId) {

return req -> {

PubSubRef<String> topic = pubSubRegistry.refFor(TopicId.of(String.class, someId));
    topic.publish("Laaaaan !");
return CompletableFuture.completedFuture(NotUsed.getInstance());
};
}

@Override
public ServiceCall<String, String> doRequestAndWaitResponse(String someId) {
// create a message with a correlation Id (UUID) and send it to kafka
// and wait for a response with this id using pubSub
return (String req) -> {
// someId is my so called correlation ID
PubSubRef<String> topic = pubSubRegistry.refFor(TopicId.of(String.class, someId));
// omitted - publish message to kafka
    return CompletableFuture.supplyAsync(() -> {
System.out.println("s=============================================" + someId);
CompletableFuture<String> future = new CompletableFuture<>();
topic.subscriber().runForeach(i -> {
System.out.println("s=============================================" + i);
future.complete(i);
}, materializer);

return future;
}).thenApply(f -> {
try {
return f.get(10l, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}

return "asd";
});
};
}




2 Ocak 2018 Salı 20:21:58 UTC+3 tarihinde Can çobanoğlu yazdı:

Tim Moore

unread,
Jan 10, 2018, 12:33:21 AM1/10/18
to Can çobanoğlu, Lagom Framework Users
Hi Can,

Sorry for the delay in my reply. This is not what I would recommend, because you are blocking a thread while waiting for the future to complete. This isn't necessary at all.

It should be more like this:

@Override
public ServiceCall<String, String> doRequestAndWaitResponse(String someId) {
  // create a message with a correlation Id (UUID) and send it to kafka
  // and wait for a response with this id using pubSub
  return (String req) -> {
    // someId is my so called correlation ID
    PubSubRef<String> topic = pubSubRegistry.refFor(TopicId.of(String.class, someId));
    System.out.println("s=============================================" + someId);
    CompletableFuture<String> future = new CompletableFuture<>();
    topic.subscriber()
      .take(1) // only one message expected, so you can terminate the stream once you get it
      .runForeach(i -> {
        System.out.println("s=============================================" + i);
        future.complete(i);
      }, materializer);
    // omitted - publish message to kafka
    // note that it is important to create the subscriber before publishing,
    // to avoid missing the message if it completes quickly

    return future; // you can return this future directly... no need to create others or call "get" on it
  };
}


Does that make sense?

Best,
Tim

To unsubscribe from this group and stop receiving emails from it, send an email to lagom-framework+unsubscribe@googlegroups.com.
To post to this group, send email to lagom-framework@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/lagom-framework/3bade376-13c4-4344-8bda-2e28d022ac00%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Tim Moore
Lagom Tech Lead, Lightbend, Inc.

Can çobanoğlu

unread,
Jan 10, 2018, 12:49:20 AM1/10/18
to Lagom Framework Users
Tim hi, 

No prob. we've come up a solution but, this is better. even so, don't i need put a timeout limit for the reply future ? Or should i ?

Many thanks.

10 Ocak 2018 Çarşamba 08:33:21 UTC+3 tarihinde Tim Moore yazdı:

Tim Moore

unread,
Jan 10, 2018, 9:09:31 PM1/10/18
to Can çobanoğlu, Lagom Framework Users
The server already has its own set of timeouts in Lagom 1.4 (https://www.playframework.com/documentation/2.6.x/SettingsAkkaHttp) and the client probably will as well (depending on what client you are using).

You might also want to put a timeout on your topic subscriber, especially if you are not yet able to upgrade to Lagom 1.4. You could use the intialTimeout method on your subscriber stream.

    topic.subscriber()
      .initialTimeout(Duration.create(5, TimeUnit.SECONDS)) // or make it configurable
      .take(1) // only one message expected, so you can terminate the stream once you get it
      .runForeach(i -> {
        System.out.println("s=============================================" + i);
        future.complete(i);
      }, materializer);


Per the docs, this will fail the stream with a TimeoutException if the first element doesn't appear within five seconds.

It just occurred to me that using runForeach this way won't report failures back to the client. I think this is even simpler and will return both success and failure correctly:

@Override
public ServiceCall<String, String> doRequestAndWaitResponse(String someId) {
  // create a message with a correlation Id (UUID) and send it to kafka
  // and wait for a response with this id using pubSub
  return (String req) -> {
    // someId is my so called correlation ID
    PubSubRef<String> topic = pubSubRegistry.refFor(TopicId.of(String.class, someId));
    System.out.println("s=============================================" + someId);
    CompletionStage<String> response = topic.subscriber()
      .initialTimeout(Duration.create(5, TimeUnit.SECONDS)) // or make it configurable
      .runWith(Sink.head(), materializer);

    // omitted - publish message to kafka
    // note that it is important to create the subscriber before publishing,
    // to avoid missing the message if it completes quickly

    return response; // you can return this future directly... no need to create others or call "get" on it
  };
}


I haven't tried actually compiling or running this, so it might contain some minor errors, but hopefully it points you in the right direction.

Cheers,
Tim

To unsubscribe from this group and stop receiving emails from it, send an email to lagom-framework+unsubscribe@googlegroups.com.
To post to this group, send email to lagom-framework@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/lagom-framework/d146362a-dcd1-4382-80e5-17cf6694fe43%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

sanj...@latitudetechnolabs.com

unread,
Jan 26, 2018, 5:41:52 AM1/26/18
to Lagom Framework Users
1 Your java application will consume those messages (mining requests) from kafka
2  your java applicatoin will do the necessary processes
3  kafka service need to rest call to another service from id.
4  Your java application will produce messages back to kafka with the result/response of the process
I complete 1,2,4 but how 3 step to do by RPC call


Message has been deleted
Message has been deleted

sanj...@latitudetechnolabs.com

unread,
Jan 27, 2018, 3:16:42 AM1/27/18
to Lagom Framework Users


On Sunday, December 17, 2017 at 5:54:46 PM UTC+5:30, Can çobanoğlu wrote:
Hi all,

I have a case that a user makes a request via REST endpoint from one of my lagom services and this request will be sent to an external Flink Server to be processed via a kafka topic, after that if this operation is done or failed an information about this should be returned to the caller service. In this case a kind of "correlation info" has to be caried that my service (waits for callback) can poll the information that is sent. so, the user that made a request can get the response.
REST call make.
Reply all
Reply to author
Forward
0 new messages