Trying to write response inside event bus but getting Response is alreay written as error

518 views
Skip to first unread message

KRR

unread,
Sep 11, 2018, 6:06:33 PM9/11/18
to vert.x
Hi All,


I have below piece of code 

routingContext.vertx().eventBus().consumer("io.vertx.redis.channelForClientToPublish",  received -> {
  System.out.println("Server Recevied message: " + received.body());
  //this reply is not being sent publisher
  //received.reply(new JsonObject().put("responseCode", "OK").put("message", "This is a just a message"));
  client.get("response", handler -> {
  if(handler.succeeded()) {
  System.out.println("Server fetched msg successfully from Redis Server as :" + handler.result());
  // here server need to frame resposnse and send back 
  routingContext.response().setStatusCode(200).end(handler.result());
  }else if(handler.failed()) {
  System.out.println(" Failed to get Message as error is : " + handler.cause());
  routingContext.response().setStatusCode(500).end("error from server");
  }
  });
  });

Here I need to wait for event bus to consume message and on the basis of that response need to fetch some data from redis server. 
and write that data to the response.

But when i am trying to hit same endpoint. I am getting error as "Response is already written".

I can't keep code to send response back out of handler. As it is asynchronous call and writing response will give null pointer exception.

How to solve above problem?

Blake

unread,
Sep 11, 2018, 7:35:14 PM9/11/18
to vert.x
Hey KRR,

I'm not sure if this solves your problem, but I think you need to setup your eventbus handler elsewhere. Make a request to eventbus address and wait for it to reply and then you can respond with the reply from EB.

Blake

Julien Viet

unread,
Sep 12, 2018, 2:40:57 AM9/12/18
to ve...@googlegroups.com
Hi,

it is complicated to understand the logic of your code, as it seeems to involve the event bus, the web client and vertx web.

can you simplify it and put all the pieces more explicit ?

a reproducer project would definitely help to help you

Julien

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/22f8be37-8928-4664-9fdc-83a230e7cb00%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Grey Seal

unread,
Sep 12, 2018, 12:58:45 PM9/12/18
to vert.x
As said earlier, a reproducer will be helpful. But a guess looking at the code, how about doing the client.get() call in the sender.


routingContext.vertx().eventBus().send("io.vertx.redis.channelForClientToPublish", messae, reply -> {
if (reply.succeeded()) {
System.out.println("Received reply " + reply.result().body());
        client.get("response", handler -> {
              if(handler.succeeded()) {
System.out.println("Server fetched msg successfully from Redis Server as :" + handler.result());
// here server need to frame resposnse and send back
routingContext.response().setStatusCode(200).end(handler.result());
}else if(handler.failed()) {
System.out.println(" Failed to get Message as error is : " + handler.cause());
routingContext.response().setStatusCode(500).end("error from server");
}
});
    } else {
System.out.println("No reply");
}
});

KRR

unread,
Sep 13, 2018, 1:58:26 AM9/13/18
to vert.x
I have sample code 


1) Here I am trying to create a RedisClient object in SimpleRestServer class. When handler method for the request (which is a method present in child class of SimpleRestServer) is called I am getting redisClient object as null.

2) So i tried to move the creation of RedisClient and subscribing to a channel part to the handler method of child class SimpleRestChild. And in this case I am getting exception as "response is already written".

Note : in abpve code on Github. I am doing as mentioned in first point.

To replicate above problem plzs move 

String redis_host = "127.0.0.1";
int redis_port = 6379;
int http_port = 9001;
setRedisClient(redis_host, redis_port);
subscribeForTheServiceChannel();
  

part of code to handle message.

KRR

unread,
Sep 13, 2018, 2:07:35 AM9/13/18
to vert.x
@Julien 

Please let me know how to get an instance of an object(Redis Client object in my case)inside a request handler method of a child class.

As per code On github I am trying to create an Redis Client object in start() of parent and when trying to access that instance I got nullpointer exception


On Tuesday, September 11, 2018 at 6:06:33 PM UTC-4, KRR wrote:

Blake

unread,
Sep 13, 2018, 9:44:26 AM9/13/18
to vert.x
Your client is null, because it was never created. Your SimpleRestChild extends SimpleRestServer. Even though you started the SimpleRestServer verticle (where the client is not null) it is a separate instance of the SimpleRestChild. If you started (as a verticle) the child class instead of the parent class you'd have the client, but this is just bad design IMO. You should either deploy the child as a separate verticle (and create the client in it) or don't use it like a verticle at all and just pass in the client as a constructor param to a separate class (not a verticle).

KRR

unread,
Sep 13, 2018, 1:47:38 PM9/13/18
to vert.x
Hi Blake,

Thanks for the response!!!

As per your guidance I deploying  Child verticle separately. But still I am getting client as null.
Below id the code snippet.

Below is the mail verticle

package com.cd.api.redis.poc.sample.rest;

import java.util.UUID;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.redis.RedisClient;
import io.vertx.redis.RedisOptions;

/**
 * @author krr
 */

public class SimpleRestServer extends AbstractVerticle{
 public static final String redisHost = System.getProperty("redis.host");
 public static final String http_Port = System.getProperty("https.port");
 
 
  @Override
  public void start() {
  
vertx.deployVerticle("com.cd.api.redis.poc.sample.rest.SinpleRestServerChild", handler -> {
if(handler.succeeded()) {
System.out.println("SinpleRestServerChild is deployed successfully ");
}else if(handler.failed()){
System.out.println("SinpleRestServerChild is failed to deploy for reason : " + handler.cause().getMessage());
}
});
    Router router = Router.router(vertx);
    router.route().handler(BodyHandler.create());
    SinpleRestServerChild child = null;
try {
child = (SinpleRestServerChild)Class.forName("com.cd.api.redis.poc.sample.rest.SinpleRestServerChild").newInstance();
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
e.printStackTrace();
}
    router.post("/subscription").handler(child::handleSubscription);
    vertx.createHttpServer().requestHandler(router::accept).listen(55443);
    System.out.println("Server started at port : " + " 55443 ");
  }

}

Now Child verticle I made it as seperate vertcile like below

package com.cd.api.redis.poc.sample.rest;

import java.util.UUID;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import io.vertx.redis.RedisClient;
import io.vertx.redis.RedisOptions;

public class SinpleRestServerChild extends AbstractVerticle{
RedisClient client;

@Override
public void start() throws Exception {
super.start();
String host = "127.0.0.1";
    client = RedisClient.create(vertx, new RedisOptions().setHost(host).setPort(6379));  
client.subscribe("channelForClientToPublish", handler -> {
  if(handler.succeeded()) {
  System.out.println("SimpleRestServer Subscribed to the channel channelForClientToPublish");
  }
});
}
private String getUUID()
{
UUID uid= UUID.randomUUID();
return uid.toString();
}

public  void handleSubscription(RoutingContext routingContext) {
JsonObject requestJson = routingContext.getBodyAsJson();
requestJson.put("uuid", getUUID());
System.out.println("Checking client ");
// Here client is null
    System.out.println(client);
routingContext.response().end("Hey !!! ");

  }
}

My second point is as I am calling handleSubscription as a part of routing handler. So, how i can pass client object to it?

Thanks!!! 
KRR

KRR

unread,
Sep 13, 2018, 2:05:33 PM9/13/18
to vert.x
Now,

I am trying to achieve something like below.

Main Verticle is as below 

package com.cd.api.redis.poc.sample.rest;

import io.vertx.core.AbstractVerticle;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.redis.RedisClient;
import io.vertx.redis.RedisOptions;

/**
 * @author krr
 */

public class SimpleRestServer extends AbstractVerticle{
 public static final String redisHost = System.getProperty("redis.host");
 public static final String http_Port = System.getProperty("https.port");
 
 
 RedisClient client;
 
  @Override
  public void start() {
  String host = "127.0.0.1";
    client = RedisClient.create(vertx, new RedisOptions().setHost(host).setPort(6379));  
// Server is subscribing itself to channel so that it can get notifed when any service is
client.subscribe("channelForClientToPublish", handler -> {
  if(handler.succeeded()) {
  System.out.println("SimpleRestServer Subscribed to the channel channelForClientToPublish");
  }
});
/*vertx.deployVerticle("com.cd.api.redis.poc.sample.rest.SinpleRestServerChild", handler -> {
if(handler.succeeded()) {
System.out.println("SinpleRestServerChild is deployed successfully ");
}else if(handler.failed()){
System.out.println("SinpleRestServerChild is failed to deploy for reason : " + handler.cause().getMessage());
}
});*/
    Router router = Router.router(vertx);
    router.route().handler(BodyHandler.create());
    router.post("/subscription").handler(this::handleSubscription);
    vertx.createHttpServer().requestHandler(router::accept).listen(55443);
    System.out.println("Server started at port : " + " 55443 ");
  }
  
  
  public void handleSubscription(RoutingContext routingContext) {
  SinpleRestServerChild child = null;
try {
child = (SinpleRestServerChild)Class.forName("com.cd.api.redis.poc.sample.rest.SinpleRestServerChild").newInstance();
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
e.printStackTrace();
}
child.handleSubscription(client, routingContext);
  }

}


And the other class which is child of main verticle as below 

package com.cd.api.redis.poc.sample.rest;

import java.util.UUID;

import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import io.vertx.redis.RedisClient;

public class SinpleRestServerChild extends SimpleRestServer{
private String getUUID()
{
UUID uid= UUID.randomUUID();
return uid.toString();
}

public void handleSubscription(RedisClient client, RoutingContext routingContext) {
// TODO Auto-generated method stub
System.out.println("Getting client in service handler");
JsonObject requestJson = routingContext.getBodyAsJson();
requestJson.put("uuid", getUUID());
System.out.println("Checking client ");
  // Now client is not null
    System.out.println(client);
routingContext.response().end("Hey !!! ");
}
}

But i am getting warning as 
WARNING: Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 2426 ms, time limit is 2000

So i am not sure it is best way to achieve the requirement.

Please guide

Thanks!!!
KRR

On Tuesday, September 11, 2018 at 3:06:33 PM UTC-7, KRR wrote:

Blake

unread,
Sep 13, 2018, 2:36:04 PM9/13/18
to vert.x
Here's an example. You're not using verticles properly. If you want to use verticles you need to communicate between them, not by calling methods on verticles from other verticles (unless you setup a service proxy, but that's beyond the scope of this right now). If you want to use

Here's an example with multiple verticles:

Here is one that I believe is more inline with what you're trying to do:

KRR

unread,
Sep 13, 2018, 7:23:12 PM9/13/18
to vert.x
Thanks Blake for your response in a very fine way !!!

I was trying the below code snippet with just one main verticle. 

And I encountered one more issue. Which i commented in the below code snippet.

I was going through documentation but didn't get answer for it.

ublic class SimpleRestServer extends AbstractVerticle{
 public static final String redisHost = System.getProperty("redis.host");
 public static final String http_Port = System.getProperty("https.port");
 
 
 RedisClient client;
 
  @Override
  public void start() {
  String host = "127.0.0.1";
    client = RedisClient.create(vertx, new RedisOptions().setHost(host).setPort(6379));  
// Server is subscribing itself to channel so that it can get notifed when any service is
client.subscribe("channelForClientToPublish", handler -> {
  if(handler.succeeded()) {
  System.out.println("SimpleRestServer Subscribed to the channel channelForClientToPublish");
  }
});

    Router router = Router.router(vertx);
    router.route().handler(BodyHandler.create());
    router.post("/subscription").handler(this::handleSubscription);
    vertx.createHttpServer().requestHandler(router::accept).listen(55443);
    System.out.println("Server started at port : " + " 55443 ");
  }
  
  
  public void handleSubscription(RoutingContext routingContext) {

JsonObject requestJson = new JsonObject(routingContext.getBodyAsString());
requestJson.put("uuid", getUUID());
client.set("request",requestJson.toString(), handler -> {
  System.out.println(" Server is setting value to redis client ");
  if(handler.succeeded()) {
  System.out.println("Key Stored successfully to redis server so that subscribed service can consume it");
  }else if(handler.failed()) {
  System.out.println(" failed to set key with cause : " + handler.cause());
  }
  });

  client.publish("channelForServerToPublish", "ServiceOne", res -> {
  System.out.println(" Server is going to publish msg on channel so that subcribed service will get notified ");
  if (res.succeeded()) {
  // here the last part is printing the channel number e.g. 1
  System.out.println("Publish msg successfully on channel : " + res.result().toString());
  }
});
receive(client, routingContext, requestJson);
  
  
  }
  
  
   private void receive(RedisClient client, RoutingContext routingContext, JsonObject requestJson) {
// here when I am trying to print request body. I am getting the correct one.(as passed while hitting the url)
                    System.out.println("Request body from RoutingContext : " + routingContext.getBodyAsString());
vertx.eventBus().consumer("io.vertx.redis.channelForClientToPublish",  received -> {
// For first request I am getting the same request body as I Passed it while hitting through postman
// But for second and all next requests I am getting the request body which was there for first request
                        // For every new request this event bus is not getting new RoutingContext object. 
System.out.println("Request body from RoutingContext : " + routingContext.getBodyAsString());
  client.get("response", handler -> {
  if(handler.succeeded()) {
  JsonObject responseObject = new JsonObject(handler.result());
  if(responseObject.getString("uuid").equals(requestJson.getString("uuid"))){
  routingContext.response().setStatusCode(200).end(handler.result());
  }
  }else if(handler.failed()) {
  routingContext.response().setStatusCode(500).end("error from server");
  }
  });
  });
}
private String getUUID()
{
UUID uid= UUID.randomUUID();
return uid.toString();
}

}


 

On Tuesday, September 11, 2018 at 3:06:33 PM UTC-7, KRR wrote:

Blake

unread,
Sep 13, 2018, 7:33:18 PM9/13/18
to vert.x
Every time you make a request (that hits the /subscription route) you're registering a new eventbus consumer. Why? And When are you publishing/sending to this eventbus consumer? And what are you trying to do with your redis pub/sub?

KRR

unread,
Sep 13, 2018, 9:39:19 PM9/13/18
to vert.x






Hi Blake,

The below link has overview of what I am trying to achieve.


The requirement is as below 

there is api gateway which is deployed on a VM.

And there are micro services which are deployed on seperate VMs. 

I am trying to use Redis PUBSUB so that api gateway can communicate and other services can communicate with each other.






On Tuesday, September 11, 2018 at 3:06:33 PM UTC-7, KRR wrote:

Blake

unread,
Sep 13, 2018, 10:09:43 PM9/13/18
to vert.x
API gateway will be a verticle? Are the other services also vertx services? If so i'd recommend using cluster mode and using the eventbus rather than redis (unless you specifically need redis for some reason), because you're just adding additional overhead with redis rather than direct eventbus communication.

Based on what you're trying to do though, here's how i'd do it.

You're gonna need to create a data structure (e.g., HashMap, concurrent if it's used by multiple verticles) to hold which UUID corresponds to which RoutingContext/Connection (i assume that's what you're adding the UUID for?). So you create your redis subscription that will be called when something is published to that redis pub-address. That response message will need to contain the UUID and you need to look the UUID up and then you can respond to that connection.


// instance variable on verticle or class (I'd just use the string for the UUID b/c that's what will be common between everything.)
Map<String, RoutingContext> awaitingPublishReplyMap = new HashMap();

route().handler(rc -> {
  String id = UUID.randomUUID().toString();
  redis.publish("receiver.for.request.address", bodyWithUUID, handler -> {
    if (handler.succeeded()) {
      awaitingPublishReplyMap.put(id, rc);
    }
  })
});

// set this up once. like where you setup your routes or something.
// the reason you were getting response already written is because the handler was being called EVERY TIME a publish event happened. even after you responded. cause you registered the handler and it does not get removed just because you responded to the request. it's not locally scoped to just that block.
redis.subscribe("once.data.is.processed.address", handler -> {
  // for brevity assuming success
  // get UUID from result.
  // get routing context from map. respond.
});

KRR

unread,
Sep 14, 2018, 7:28:14 AM9/14/18
to vert.x
Blake 

It is required to use Redis instead of clustering.

I will try to use HashMap. Thanks for suggestion.

Just for my curiosity. I would like to know in below code snippet why RoutingContext is not getting updated for every new request.

Why for every request inside eventbus.consumer() RoutingContext object is the one which got generated for first request.

How this eventbus.consumer() really works?


This is api gateway(it is just a sample)

public class SimpleRestServer extends AbstractVerticle{
 public static final String redisHost = System.getProperty("redis.host");
 public static final String http_Port = System.getProperty("https.port");
 
 
 RedisClient client;
 
  @Override
  public void start() {
  String host = "127.0.0.1";
    client = RedisClient.create(vertx, new RedisOptions().setHost(host).setPort(6379));  
// Server is subscribing itself to channel so that it can get notifed when any service is
client.subscribe("channelForClientToPublish", handler -> {
   if(handler.succeeded()) {
                       // RoutingContext object is same(which got created for first request) for every request I send. 
 System.out.println("Request body from RoutingContext : " + routingContext.getBodyAsString());
  client.get("response", handler -> {
  if(handler.succeeded()) {
  JsonObject responseObject = new JsonObject(handler.result());
  if(responseObject.getString("uuid").equals(requestJson.getString("uuid"))){
  routingContext.response().setStatusCode(200).end(handler.result());
  }
  }else if(handler.failed()) {
  routingContext.response().setStatusCode(500).end("error from server");
  }
  });
  });
}
private String getUUID()
{
UUID uid= UUID.randomUUID();
return uid.toString();
}

}


This is a service code 

public class SimpleMicroService extends AbstractVerticle {

 public static final String redisHost = System.getProperty("redis.host");
  @Override
  public void start() {
  
  
  String host = "127.0.0.1";
  //String host = getRedisHost();
  final RedisClient client = RedisClient.create(vertx, new RedisOptions().setHost(host).setPort(6379));
  
client.subscribe("channelForServerToPublish", handler -> {
  if(handler.succeeded()) {
  System.out.println(" ServiceOne Subscribed to channel name channelForServerToPublish ");
  }
});
  
  
  vertx.eventBus().consumer("io.vertx.redis.channelForServerToPublish",  received -> {
  System.out.println("Recevied message: " + received.body());
  JsonObject jsonObject = (JsonObject) received.body();
  
  if(jsonObject.getString("status").equals("ok") && jsonObject.getJsonObject("value").getString("message").equals("ServiceOne"))
  {
  client.get("request", handler -> {
  if(handler.succeeded()) {
  System.out.println("ServiceOne got msg from Redis Server successfully as :" + handler.result());
  //received.reply(new JsonObject().put("responseCode", "OK").put("message", "This is a passed response "));
  //frame json object here and set it to redis server and publish notification on channel so that
  //server will get notified that response is there in redis server
  JsonObject requestJson = new JsonObject(handler.result());
  JsonObject response = new JsonObject().put("status", "ok").put("statusCode", 200)
  .put("body", new JsonObject().put("key1", "value1").put("key2", "value2")).
  put("uuid", requestJson.getString("uuid"));
  System.out.println("Setting UUID in response : " + response.getString("uuid"));
  client.set("response", response.toString(), responseSetHandler -> {
  if(responseSetHandler.succeeded()) {
  client.publish("channelForClientToPublish", "ResponseNotification", res -> {
  System.out.println(" ServiceOne is going to publish msg on channel so that Server will get notified ");
  if (res.succeeded()) {
  // it here the last part is printing the channel number e.g. 1
  System.out.println("ServerOne Publish msg successfully on channel : " + res.result().toString());
  }if(res.failed()) {
  System.out.println("ServiceOne failed to publish msg on channel");
  }
});
  
  }
  if(responseSetHandler.failed()) {
  System.out.println(" Service1 failed set the response to Redis server with reason : " + responseSetHandler.cause().getMessage());
  }
  });
  }else if(handler.failed()) {
  System.out.println(" Failed to get Message as error is : " + handler.cause());
  // received.reply(new JsonObject().put("responseCode", "ERROR").put("message", "This is failed response "));
  }
  });
  }
     });
   }

private String getRedisHost() {
// TODO Auto-generated method stub
return redisHost;
}


 



On Tuesday, September 11, 2018 at 3:06:33 PM UTC-7, KRR wrote:
Message has been deleted

Blake

unread,
Sep 14, 2018, 9:47:22 AM9/14/18
to vert.x
The code you initially posted had many errors with it. Do not try to keep using that code. Do not create a lasting event bus subscription on each request (for each new routing context). I pointed this out to you in the last response/example. Every time you get a new request you're setting up a new consumer subscription that is REGISTERED on the event bus and is not removed when the request ends. Therefore the callback has a reference to the old routing context. If it's not registering multiple consumers (every time a new request comes in) for that event bus address then I imagine that vertx simply keeps the first one that is registered - thus, you're always getting your first routing context.

KRR

unread,
Sep 14, 2018, 11:58:09 AM9/14/18
to vert.x
Thanks a lot Blake for having patience of answering my queries. And to provide sample code to elaborate.


On Tuesday, September 11, 2018 at 3:06:33 PM UTC-7, KRR wrote:
Reply all
Reply to author
Forward
0 new messages