Pipeline execute return null Array

116 views
Skip to first unread message

Eduardo Iglesias

unread,
Oct 9, 2022, 6:10:49 PM10/9/22
to Redis DB
Hi, 

I have a process that intensively executes a MULTI/EXEC.
Within this, commands such as: incrby, del and zrem are executed.
About 5000 keys are processed.

On the other hand I have multiple operations that are executed from other services.

At some sporadic moment the transaction fails and returns a null array immediately.

They can go 2 days without having a problem or happen 5 times in the same day.

Is there any limitation in Redis by which I cannot execute that transaction? Limit of commands in queue? Memory buffer to answer?

Regards.

Greg Andrews

unread,
Oct 9, 2022, 7:57:23 PM10/9/22
to Redis DB
Have you investigated the behavior of your Redis client library and made completely sure this is not due to the library timing out while waiting for the Redis server to finish executing the commands?  I've never known the Redis server code to return an empty response - always a success or failure.

Eduardo Iglesias

unread,
Oct 9, 2022, 11:01:01 PM10/9/22
to Redis DB
Yes, I send the commands through a pipeline using an asio socket. It does not have a defined execution timeout.
As I already mentioned, it works well for days, it can even be months.
I associate it with load when redis processes many commands from other services and the transaction fails even though there are few commands within it.
The limit should be in the redis engine because of the behavior it has, at least it is the conclusion to which it reaches.
As a workaround to see if the problem is resolved, was to migrate the process to a lua script.
Any comment is welcome...

Eduardo Iglesias

unread,
Oct 9, 2022, 11:09:41 PM10/9/22
to Redis DB
EXEC command fails returning *-1\r\n.

This is my Flush method:

bool Connection::Flush()
{
    if (!IsConnected()) {
        return false;
    }

    bool multiUsed = false;

    // Send
    string pipelinedMessage;
    for (list< boost::shared_ptr<RedisOp> >::const_iterator g = m_pendingCommands.begin(); g != m_pendingCommands.end(); ++g) {
        if ((*g)->IsMulti()) {
            if (multiUsed) continue; // Allow only the first multi
            multiUsed = true;
        }
        list<string> command = (*g)->GetCommand();
        pipelinedMessage += "*" + boost::lexical_cast<string>(command.size()) + "\r\n";
        for (list<string>::iterator c = command.begin(); c != command.end(); ++c) {
            pipelinedMessage += "$" + boost::lexical_cast<string>(c->size()) + "\r\n" + *c + "\r\n";
        }
    }
    if (multiUsed) {
        pipelinedMessage += "EXEC\r\n";
    }

    boost::system::error_code error;
    size_t sent = boost::asio::write(m_socket, boost::asio::buffer(pipelinedMessage.c_str(), pipelinedMessage.size()), boost::asio::transfer_all(), error);
    if (error || sent < pipelinedMessage.size()) {
        m_socket.close();
        return false;
    }

    bool multiReached = false;
    list< boost::shared_ptr<RedisOp> > afterMulti;

    // Receive
    for (list< boost::shared_ptr<RedisOp> >::const_iterator g = m_pendingCommands.begin(); g != m_pendingCommands.end(); ++g) {
        if (multiReached && (*g)->IsMulti()) {
            Response r;
            r.m_error = "Only one MULTI operation allowed at the same time";
            (*g)->SetResponse(r);
            continue;
        }

        Response r = ReadCommandResponse(m_socket);
        if (multiReached && r.m_error == "") {
            afterMulti.push_back(*g); // response will come after the exec.
        }
        else {
            (*g)->SetResponse(r);
        }

        if ((*g)->IsMulti()) {
            multiReached = true;
        }
    }
    if (multiReached) {
        // Get the exec and send the responses to the queued commands
        Response r = ReadCommandResponse(m_socket); // EXEC
        m_hasWatchs = false;
        if (r.m_gotNull) {
            // exec failed!
            m_pendingCommands.clear();
            m_hasWatchs = false;
            return false;
        }
        for (list<Response>::iterator er = r.m_multiValue.begin(); er != r.m_multiValue.end(); ++er) {
            if (afterMulti.empty()) {
                continue; // More responses arrived than commands sent.
            }
            afterMulti.front()->SetResponse(*er);
            afterMulti.pop_front();
        }
        for (list< boost::shared_ptr<RedisOp> >::iterator op = afterMulti.begin(); op != afterMulti.end(); ++op) {
            Response r;
            r.m_error = "No response for this operator arrived in EXEC response.";
            (*op)->SetResponse(r);
        }
    }
    m_pendingCommands.clear();
    // m_hasWatchs is only cleared upon an EXEC
    return true;
}

Eduardo Iglesias

unread,
Oct 9, 2022, 11:19:31 PM10/9/22
to Redis DB
The result it returns is as if I had a WATCH on some key that makes me abort the transaction, but I don't use  WATCH  in this type of operations.

Optimistic locking using check-and-set

WATCH is used to provide a check-and-set (CAS) behavior to Redis transactions.

WATCHed keys are monitored in order to detect changes against them. If at least one watched key is modified before the EXEC command, the whole transaction aborts, and EXEC returns a Null reply to notify that the transaction failed.

Marcelo Zimbres Silva

unread,
Oct 10, 2022, 2:19:47 AM10/10/22
to redi...@googlegroups.com
Hi,
BTW, this looks like very low level code, where lots of things can go
wrong if a certain condition is met. Perhaps you could try a higher
level library, like Aedis (which I am the author of) where things will
be done automatically for you, for example:
https://github.com/mzimbres/aedis/blob/770e224917e4f3afea2b244cf1448d19f052873c/examples/intro.cpp#L25.

Marcelo

Greg Andrews

unread,
Oct 11, 2022, 1:16:55 PM10/11/22
to Redis DB
I'm an infrastructure guy, with a minimal trust of code.  I understand the calls your main code made into the Redis client library/layer received a null response from that library/layer, but what did the Redis server return through the network?

Eduardo Iglesias

unread,
Oct 12, 2022, 8:37:12 AM10/12/22
to Redis DB
Hi, I already solved the issue.
I was reusing the connection for the MULTI/EXEC that I had previously used to do a GET with WATCH.
At some point another process modified the value of the GET key, causing me to abort the transaction.
Thanks
Reply all
Reply to author
Forward
0 new messages