Porting Unit Tests in 2.4 To Netty 4.1 (Issue #918)

24 views
Skip to first unread message

nwhitehead

unread,
Jan 17, 2017, 9:52:10 AM1/17/17
to OpenTSDB
Asking for a quickie code review.
I am working on porting OpenTSDB to Netty 4.1. The main port is complete and all the basics are working.
I am having some challenges porting some of the unit tests. In this case, the mocked Netty 3 channels don't work with Netty 4.
I decided to rewrite these tests (specifically the TSD RPC tests) using EmbeddedChannels. To support this, I added 2 methods to NettyMocks.
The method writeThenReadFromRpcHandler creates an EmbeddedChannel with the RpcHandler installed in it, so writing a Netty HttpRequest into the channel as an input message,
produces an HttpResponse on reading an output message.

Additions to NettyMocks

/**
* Writes the passed object to an instance of the {@link RpcHandler} handler in an embedded channel
* and returns the value read back from the channel
* @param writeObject The object to write to the handler
* @param tsdb The TSDB instance to create the RpcHandler with
* @return The object read back from the channel
*/
@SuppressWarnings("unchecked")
public static <T> T writeThenReadFromRpcHandler(final Object writeObject, final TSDB tsdb) {
final EmbeddedChannel ec = rpcHandlerChannel(tsdb);
ec.writeInbound(writeObject);
ec.runPendingTasks();
return (T)ec.readOutbound();
}


/**
* Creates a Netty EmbeddedChannel that routes passed messages to an instance of a {@link RpcHandler} handler.
* @param tsdb The [mocked] TSDB instance
* @return The embedded channel, ready for writing to and reading from
*/
public static EmbeddedChannel rpcHandlerChannel(final TSDB tsdb) {
final RpcManager rpcManager = RpcManager.instance(tsdb);
final RpcHandler rpcHandler = new RpcHandler(tsdb, rpcManager);
return new EmbeddedChannel(rpcHandler);
}

Here's a sample of the current method (TestQueryRpc.postQuerySimplePass):

@Test
public void postQuerySimplePass() throws Exception {
final DataPoints[] datapoints = new DataPoints[1];
datapoints[0] = new MockDataPoints().getMock();
when(query_result.runAsync()).thenReturn(
Deferred.fromResult(datapoints));

HttpQuery query = NettyMocks.postQuery(tsdb, "/api/query",
"{\"start\":1425440315306,\"queries\":" +
"[{\"metric\":\"somemetric\",\"aggregator\":\"sum\",\"rate\":true," +
"\"rateOptions\":{\"counter\":false}}]}");
NettyMocks.mockChannelFuture(query);
rpc.execute(tsdb, query);
assertEquals(HttpResponseStatus.OK, query.response().getStatus());
}

The refactored method (plus a failing test to make sure it's not a Noop) is:

@Test
public void postQuerySimplePass() throws Exception {
HttpQuery query = NettyMocks.postQuery(tsdb, "/api/query",
"{\"start\":1425440315306,\"queries\":" +
"[{\"metric\":\"somemetric\",\"aggregator\":\"sum\",\"rate\":true," +
"\"rateOptions\":{\"counter\":false}}]}");
final FullHttpResponse response = NettyMocks.writeThenReadFromRpcHandler(query.request(), tsdb);
assertEquals(HttpResponseStatus.OK, response.status());
}

@Test
public void postQuerySimpleFail() throws Exception {
HttpQuery query = NettyMocks.postQuery(tsdb, "/api/query",
"{\"start\":1425440315306,\"queries\":" +
"[{\"metric\":\"somemetric\",\"aggregator\":\"sum\",\"rate\":true," +
"\"rateOptions\":{\"counter\":false"); // BAD JSON !
final FullHttpResponse response = NettyMocks.writeThenReadFromRpcHandler(query.request(), tsdb);
assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.status());
}

Both tests pass. Is this an acceptable refactoring ?

Thanks.

//Nicholas

nwhitehead

unread,
Jan 17, 2017, 11:40:04 AM1/17/17
to OpenTSDB
Here's an example of a test that actually returns content from TestPutRpc (it passes):

@Test
public void putSingleDetails() throws Exception {
HttpQuery query = NettyMocks.postQuery(tsdb, "/api/put?details",
"{\"metric\":\"" + METRIC_STRING + "\",\"timestamp\":1365465600,\"value\""
+":42,\"tags\":{\"" + TAGK_STRING + "\":\"" + TAGV_STRING + "\"}}");
final FullHttpResponse httpResponse = NettyMocks.writeThenReadFromRpcHandler(query.request(), tsdb);
assertEquals(HttpResponseStatus.OK, httpResponse.status());
final String response =
httpResponse.content().toString(Charset.forName("UTF-8"));
assertTrue(response.contains("\"failed\":0"));
assertTrue(response.contains("\"success\":1"));
assertTrue(response.contains("\"errors\":[]"));
validateCounters(0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
validateSEH(false);
}

nwhitehead

unread,
Jan 20, 2017, 7:05:41 AM1/20/17
to OpenTSDB
Cleaned this up a bit. Rather than using the whole pipeline, I modified to wrap the target HttpRpc in a ChannelDuplexHandler. Much more lightweight.

Tests now look like this:

HttpQuery query = NettyMocks.postQuery(tsdb, "/api/put?summary", 
    "[{\"metric\":\"" + METRIC_STRING + "\",\"timestamp\":1365465600,\"value\""
+ ":42,\"tags\":{\"" + TAGK_STRING + "\":\"" + TAGV_STRING
        + "\"}},{\"metric\":\"" + METRIC_B_STRING + "\","
+ "\"timestamp\":1365465600,\"value\":24,\"tags\":"
+ "{\"" + TAGK_STRING + "\":\"" + TAGV_STRING + "\"}}]");
final FullHttpResponse httpResponse = NettyMocks.writeThenReadFromChannel(tsdb, new PutDataPointRpc(tsdb.getConfig()), query.request());
assertEquals(HttpResponseStatus.OK, httpResponse.status());
final String response =
httpResponse.content().toString(Charset.forName("UTF-8"));
assertTrue(response.contains("\"failed\":0"));
assertTrue(response.contains("\"success\":2"));

Additions to NettyMocks:

/**
* Creates a new EmbeddedChannel containing the specified HttpRpc
* @param tsdb The TSDB to test against
* @param httpRpc The HttpRpc instance to invoke
* @return The EmbeddedChannel, ready to test
*/
public static EmbeddedChannel testChannel(final TSDB tsdb, final HttpRpc httpRpc) {
final ChannelDuplexHandler rpcWrapper = new ChannelDuplexHandler() {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
final FullHttpRequest request = (FullHttpRequest)msg;
final HttpQuery query = new HttpQuery(tsdb, request, ctx);
httpRpc.execute(tsdb, query);
}
};
return new EmbeddedChannel(rpcWrapper);
}

/**
* Creates a new EmbeddedChannel containing the specified HttpRpc,
* writes the passed inbound objects into it, and returns the response.
* @param tsdb The TSDB to test against
* @param httpRpc The HttpRpc instance to invoke
* @param inbound The inbound objects to write
* @return the HttpRpc response
*/
@SuppressWarnings("unchecked")
public static <T> T writeThenReadFromChannel(final TSDB tsdb, final HttpRpc httpRpc, final Object...inbound) {
final EmbeddedChannel ec = testChannel(tsdb, httpRpc);
try {
ec.writeInbound(inbound);
ec.runPendingTasks();
T t = ec.readOutbound();
return t;
} catch (Exception ex) {
return (T)handleException(ex);
}
}

/**
* Handles an exception thrown from the direct invocation of the target HttpRpc
* @param ex The thrown exception
* @return the HttpResponse representing the thrown exception
*/
public static DefaultFullHttpResponse handleException(final Exception ex) {
try {
throw ex;
} catch (BadRequestException brex) {
final DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, brex.getStatus());
ByteBufUtil.writeUtf8(response.content(), brex.getMessage());
final String details = brex.getDetails();
if(details!=null && !details.trim().isEmpty()) {
ByteBufUtil.writeUtf8(response.content(), "|");
ByteBufUtil.writeUtf8(response.content(), details.trim());
}
return response;
} catch (Exception exx) {
final DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
ByteBufUtil.writeUtf8(response.content(), exx.getMessage());
return response;
}
}

/**
* Creates a new EmbeddedChannel containing the specified HttpRpc,
* writes the passed inbound objects into it, and returns the response.
* @param tsdb The TSDB to test against
* @param httpRpc The HttpRpc instance to invoke
* @param inbound The inbound objects to write
* @return The EmbeddedChannel, ready to be read from
*/
public static EmbeddedChannel writeToChannel(final TSDB tsdb, final HttpRpc httpRpc, final Object...inbound) {
final EmbeddedChannel ec = testChannel(tsdb, httpRpc);
try {
ec.writeInbound(inbound);
ec.runPendingTasks();
} catch (Exception ex) {
final DefaultFullHttpResponse response = handleException(ex);
ec.writeOutbound(response);
}
return ec;
}

 
//Nicholas

nwhitehead

unread,
Jan 22, 2017, 4:51:40 PM1/22/17
to OpenTSDB
Please disregard. Figured out how to modify NettyMocks to run the existing unit tests almost unmodified.
Reply all
Reply to author
Forward
0 new messages