Lettuce, Rx, and batching

1,001 views
Skip to first unread message

Michael Moores

unread,
Oct 9, 2017, 4:14:17 PM10/9/17
to lettuce-redis-client-users
Maybe I should be posting here instead of raising question issues...
In the future I'll start here.

I raised this issue regarding how I could control redis command flushing in order to pipeline, and do this with RxJava 1 or 2.
I'm getting pretty good performance with Jedis, but we have to maintain large connection pools to do this as connections are not thread safe.
I'm executing hundreds to a few thousand commands per read only request- and doing this with Jedis pipeline is pretty darn fast.
But we have to manage a huge number of connections to Redis, and Redis is not liking this.  I was thinking that Lettuce would be ideal.
But it appears there's no way to pipeline with reactive.  As well, no way to share connections and do pipelining.

See below.

https://github.com/lettuce-io/lettuce-core/issues/624 :

I'm currently using 4.4.0.Final, as well as experimenting with your new 5.5.0 release.
I'm trying to port our Jedis based impl over to Lettuce.
I have a nice/clean RxJava1 implementation working, but the problem is that it does not look possible to control command flushing through RxJava. I think this is why my Lettuce impl is much slower than my Jedis impl which uses the Jedis pipeline API extensively. When running with DEBUG logging, I can see the flushing on every Observable.
I want to confirm that it's not possible to mingle RxJava Observables with explicit command flushing coordination.


We also noticed that Redis slows down when we try to create lots of connections to Redis.
We use connection pools with Jedis because we have to. We were thinking that going to Lettuce would allow us to use 1 connection or a much smaller pool of connections, with the assumption that connections can be shared across threads. But if we take advantage of the asynchronous API and command batching, it looks like we also need pooling because the connection state must be managed per thread in order to pipeline.


Can you confirm these assumptions?


Michael Moores

Mark Paluch

unread,
Oct 11, 2017, 2:45:56 AM10/11/17
to lettuce-redis-client-users
This question was answered on Github. The mailing list is indeed the better channel to discuss general questions with a broader audience.

Cheers, 
Mark

Michael Moores

unread,
Oct 11, 2017, 12:10:36 PM10/11/17
to lettuce-redis-client-users
One question though regarding the example code you provided and hmget.
In the method signature for hmget, the field should be a list of values or vargs as in the API: 
RedisFuture<List<KeyValue<K,V>>> hmget(K key, K... fields)

interface MyCommands extends Commands, BatchExecutor {

    RedisFuture<Set<String>> smembers(String key, CommandBatching batching);

    RedisFuture<Map<String, String>> hmget(String key, String field, CommandBatching batching);   // ???????????????
}

I'm trying to make this work, but I keep get an exception  "ERR wrong number of arguments for 'hmget' command".

One twist, I'm doing this with scala.  I have the first three methods working, have not tested the mget, and hmget is a problem.
So what should the java java equivalent hmget method signature be if vargs must be the last argument and I am using a CommandBatching argument?


trait PropertyCommands extends Commands with BatchExecutor {

  def get(key: Array[Byte], batching: CommandBatching): RedisFuture[Array[Byte]]

  def smembers(key: Array[Byte], batching: CommandBatching): RedisFuture[java.util.Set[Array[Byte]]]

  def hgetall(key: Array[Byte], batching: CommandBatching): RedisFuture[java.util.Map[Array[Byte], Array[Byte]]]

  def mget(keys: java.util.List[Array[Byte]], batching: CommandBatching): RedisFuture[java.util.Set[Array[Byte]]]

  def hmget(
    batching: CommandBatching,
    key: Array[Byte],
    fields: Array[Byte]*
  ): RedisFuture[java.util.Map[Array[Byte], Array[Byte]]]
}

Mark Paluch

unread,
Oct 11, 2017, 2:51:29 PM10/11/17
to lettuce-redis-client-users
Hi Michael, 

regarding the varargs: HMGET accepts one to many hash fields. You’re not required to use varargs if you know that you always fetch a static number of fields – in that case simply declare hmget as: hmget(String key, String field1, String field2, …). 

If the number of fields varies between calls, either use varargs or pass a collection of field names to the call. Command interfaces append/unroll all collection elements to materialize the command which is sent to Redis. However, I made a mistake with the return type as HMGET returns a list of values, not a map. I mixed up things with HGETALL.

The fixed signature using varargs would then look:

RedisFuture<List<String>> hmget(String key, CommandBatching batching,  String... field);

Not sure why you get an error with wrong number of args. Did you check how the command looks like when it’s received by Redis? (using the MONITOR command).

I’m totally unfamiliar with Scala. Any parameter that implements java.lang.Iterable is unrolled and values are appended at the command position to form the final command. CommandBatching is a special argument that is ignored to build the command, you can place it anywhere you like. I moved CommandBatching just for the sample to the very end.

Cheers, 
Mark

--
You received this message because you are subscribed to the Google Groups "lettuce-redis-client-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to lettuce-redis-clien...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/lettuce-redis-client-users/ca2ac181-9a27-4737-9b52-d95790078f10%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Michael Moores

unread,
Oct 11, 2017, 4:50:23 PM10/11/17
to lettuce-redis-client-users
Odd indeed.  No I'm not seeing redis logging the command.  Looks like a redis error message but MONITOR is not logging it.

I can't find the code that does this work based on the stack trace i'm seeing.

If I use a java.util.List or actual vargs it looks like this HMGET is being called with only the key and not the fields.
If I change the method to pass in exactly 2 fields it works fine.
Maybe I'll create a java class interface and try that.
So I have GET/SMEMBERS/HGETALL working fine so far.  scala just creates java classes, but maybe something is different here under the cover.

def hmget(
batching: CommandBatching, // Verified that CommandBatching can be in any parameter location.
key: Array[Byte],
f1: Array[Byte],
f2: Array[Byte]
): RedisFuture[java.util.List[Array[Byte]]] // Broken impl in lettuce is not returning a map.
}


def hmget(
batching: CommandBatching, // Verified that CommandBatching can be in any parameter location.
key: Array[Byte],
fields: java.util.List[Array[Byte]], // For some reason it looks like this is not passed in to REDIS.
): RedisFuture[java.util.List[Array[Byte]]] // Broken impl in lettuce is not returning a map.


1507753787.915358 [0 172.17.37.2:60484] "GET" "Property:16827714"
1507753787.915375 [0 172.17.37.2:60484] "SMEMBERS" "MarginDelta:16827714"
1507753787.915380 [0 172.17.37.2:60484] "SMEMBERS" "MarginRule:16827714"
1507753787.915395 [0 172.17.37.2:60484] "HGETALL" "RoomType:16827714"
1507753788.006914 [0 172.17.37.2:60484] "HGETALL" "RatePlan:201788707"
1507753788.006932 [0 172.17.37.2:60484] "SMEMBERS" "RoomTypeAge:201788707"
1507753788.006943 [0 172.17.37.2:60484] "SMEMBERS" "RoomTypeBedType:201788707"
1507753788.006949 [0 172.17.37.2:60484] "SMEMBERS" "RoomTypeThreshold:201788707"
--- should be here but an exception is thrown --

Error during batch command execution
io.lettuce.core.dynamic.batch.BatchException: Error during batch command execution
at io.lettuce.core.dynamic.BatchExecutableCommand.synchronize(BatchExecutableCommand.java:97)
at io.lettuce.core.dynamic.BatchExecutableCommandLookupStrategy$1.execute(BatchExecutableCommandLookupStrategy.java:78)
at io.lettuce.core.dynamic.RedisCommandFactory$CommandFactoryExecutorMethodInterceptor.invoke(RedisCommandFactory.java:220)
at io.lettuce.core.dynamic.intercept.MethodInterceptorChain$MethodInterceptorContext.proceed(MethodInterceptorChain.java:117)
at io.lettuce.core.dynamic.intercept.MethodInterceptorChain$PooledMethodInvocation.proceed(MethodInterceptorChain.java:194)
at io.lettuce.core.dynamic.intercept.DefaultMethodInvokingInterceptor.invoke(DefaultMethodInvokingInterceptor.java:45)
at io.lettuce.core.dynamic.intercept.MethodInterceptorChain$MethodInterceptorContext.proceed(MethodInterceptorChain.java:117)
at io.lettuce.core.dynamic.intercept.MethodInterceptorChain.invoke(MethodInterceptorChain.java:79)
at io.lettuce.core.dynamic.intercept.InvocationProxyFactory$InterceptorChainInvocationHandler.handleInvocation(InvocationProxyFactory.java:101)
at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:80)
at com.sun.proxy.$Proxy9.flush(Unknown Source)
at com.expedia.lodging.pricing.repositories.cached.lettuce.LettuceAsyncCacheRepository.getRoomDetail(LettuceAsyncCacheRepository.scala:106)
at com.expedia.lodging.pricing.repositories.cached.lettuce.LettuceAsyncCacheRepository.$anonfun$getProperty$3(LettuceAsyncCacheRepository.scala:56)
at scala.Option.map(Option.scala:146)
at com.expedia.lodging.pricing.repositories.cached.lettuce.LettuceAsyncCacheRepository.getProperty(LettuceAsyncCacheRepository.scala:56)
at com.expedia.lodging.pricing.cached.AsyncBatchSpec.$anonfun$new$1(AsyncBatchSpec.scala:22)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1682)
at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
at com.expedia.lodging.pricing.cached.AsyncBatchSpec.org$scalamock$scalatest$AbstractMockFactory$$super$withFixture(AsyncBatchSpec.scala:15)
at org.scalamock.scalatest.AbstractMockFactory.$anonfun$withFixture$1(AbstractMockFactory.scala:35)
at org.scalamock.MockFactoryBase.withExpectations(MockFactoryBase.scala:53)
at org.scalamock.MockFactoryBase.withExpectations$(MockFactoryBase.scala:45)
at com.expedia.lodging.pricing.cached.AsyncBatchSpec.withExpectations(AsyncBatchSpec.scala:15)
at org.scalamock.scalatest.AbstractMockFactory.withFixture(AbstractMockFactory.scala:34)
at org.scalamock.scalatest.AbstractMockFactory.withFixture$(AbstractMockFactory.scala:31)
at com.expedia.lodging.pricing.cached.AsyncBatchSpec.withFixture(AsyncBatchSpec.scala:15)
at org.scalatest.FlatSpecLike.invokeWithFixture$1(FlatSpecLike.scala:1680)
at org.scalatest.FlatSpecLike.$anonfun$runTest$1(FlatSpecLike.scala:1692)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
at org.scalatest.FlatSpecLike.runTest(FlatSpecLike.scala:1692)
at org.scalatest.FlatSpecLike.runTest$(FlatSpecLike.scala:1674)
at org.scalatest.FlatSpec.runTest(FlatSpec.scala:1685)
at org.scalatest.FlatSpecLike.$anonfun$runTests$1(FlatSpecLike.scala:1750)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
at scala.collection.immutable.List.foreach(List.scala:378)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.FlatSpecLike.runTests(FlatSpecLike.scala:1750)
at org.scalatest.FlatSpecLike.runTests$(FlatSpecLike.scala:1749)
at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1685)
at org.scalatest.Suite.run(Suite.scala:1147)
at org.scalatest.Suite.run$(Suite.scala:1129)
at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1685)
at org.scalatest.FlatSpecLike.$anonfun$run$1(FlatSpecLike.scala:1795)
at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
at org.scalatest.FlatSpecLike.run(FlatSpecLike.scala:1795)
at org.scalatest.FlatSpecLike.run$(FlatSpecLike.scala:1793)
at org.scalatest.FlatSpec.run(FlatSpec.scala:1685)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1340)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1334)
at scala.collection.immutable.List.foreach(List.scala:378)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1031)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1010)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
at org.scalatest.tools.Runner$.run(Runner.scala:850)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
Suppressed: io.lettuce.core.RedisCommandExecutionException: ERR wrong number of arguments for 'hmget' command
at io.lettuce.core.LettuceFutures.awaitAll(LettuceFutures.java:85)
at io.lettuce.core.LettuceFutures.awaitAll(LettuceFutures.java:46)
at io.lettuce.core.dynamic.BatchExecutableCommand.synchronize(BatchExecutableCommand.java:93)
... 67 more
Caused by: io.lettuce.core.RedisCommandExecutionException: ERR wrong number of arguments for 'hmget' command
at io.lettuce.core.protocol.AsyncCommand.completeResult(AsyncCommand.java:116)
at io.lettuce.core.protocol.AsyncCommand.complete(AsyncCommand.java:107)
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:513)
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:485)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:745)
Suppressed: io.lettuce.core.RedisCommandExecutionException: ERR wrong number of arguments for 'hmget' command
at io.lettuce.core.LettuceFutures.awaitAll(LettuceFutures.java:85)
at io.lettuce.core.LettuceFutures.awaitAll(LettuceFutures.java:46)
at io.lettuce.core.dynamic.BatchExecutableCommand.synchronize(BatchExecutableCommand.java:93)
... 67 more

Michael Moores

unread,
Oct 11, 2017, 5:11:59 PM10/11/17
to lettuce-redis-client-users
So I tried the java interface and I get the same error.  You said an Iterable would be ok?  (this argument contains 1 to 30 entries - dates in a month, so it is variable length)

public interface JavaPropertyCommands extends Commands, BatchExecutor {

RedisFuture<byte[]> get(byte[] key, CommandBatching batching);

RedisFuture<Set<byte[]>> smembers(byte[] key, CommandBatching batching);

RedisFuture<Map<byte[], byte[]>> hgetall(byte[] key, CommandBatching batching);

RedisFuture<List<byte[]>> hmget(CommandBatching batching, byte[] key, List<byte[]> fields);

}

Michael Moores

unread,
Oct 11, 2017, 6:11:17 PM10/11/17
to lettuce-redis-client-users
Hi Mark, Here is the repro, based on what I think the method sig for hmget should be.
Does this produce an invalid HMGET call for you?

Caused by: io.lettuce.core.RedisCommandExecutionException: ERR wrong number of arguments for 'hmget' command
at io.lettuce.core.protocol.AsyncCommand.completeResult(AsyncCommand.java:116)
at io.lettuce.core.protocol.AsyncCommand.complete(AsyncCommand.java:107)
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:513)
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:485)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)



package lettuce;

import io.lettuce.core.RedisFuture;
import io.lettuce.core.dynamic.Commands;
import io.lettuce.core.dynamic.batch.BatchExecutor;
import io.lettuce.core.dynamic.batch.CommandBatching;

import java.util.List;
import java.util.Map;
import java.util.Set;

// A java based command interface that mirrors the PropertyCommands.
// Using this to prove that the java version also has a problem in Lettuce with hmget:
//  io.lettuce.core.RedisCommandExecutionException: ERR wrong number of arguments for 'hmget' command.
// Lettuce has no problem if I pass in 5 individual parameters for each field entry.  But vargs or a java.util.Iterable fail here
// when they should not.
public interface JavaPropertyCommands extends Commands, BatchExecutor {

    RedisFuture<byte[]> get(byte[] key, CommandBatching batching);

    RedisFuture<Set<byte[]>> smembers(byte[] key, CommandBatching batching);

    RedisFuture<Map<byte[], byte[]>> hgetall(byte[] key, CommandBatching batching);

    RedisFuture<List<byte[]>> hmget(CommandBatching batching, byte[] key, List<byte[]> fields);



package lettuce;

import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.dynamic.RedisCommandFactory;
import io.lettuce.core.dynamic.batch.CommandBatching;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class HmgetTest {

    public void callHmget() throws Exception {

        RedisClient client = RedisClient.create();
        RedisURI uri = RedisURI.create("redis://localhost:6379");
        StatefulRedisConnection<byte[], byte[]> connection =  client.connect(new ByteArrayCodec(), uri);
        RedisCommandFactory factory = new RedisCommandFactory(connection);
        JavaPropertyCommands commands = factory.getCommands(JavaPropertyCommands.class);

        byte[] key = "key".getBytes();
        List<byte[]> fields = new ArrayList<>();
        fields.add("f1".getBytes());
        fields.add("f2".getBytes());

        RedisFuture<List<byte[]>> f = commands.hmget(CommandBatching.flush(), key, fields);

        List<byte[]> answer = f.get(2, TimeUnit.SECONDS);
        System.out.println(answer);
    }
}



Reply all
Reply to author
Forward
0 new messages