[Python Client] SubscriberClient synchronous pull function issues

1,840 views
Skip to first unread message

Ridha El Bekri

unread,
Jan 26, 2019, 9:54:56 AM1/26/19
to Google Cloud Pub/Sub Discussions
Hello everyone,

I'm facing a strange issue with the pubsub python client. I have a python loop setup to continuously look for messages in a pubsub topic [running in a GKE container]. The way it works: it uses a subscription to pull a batch of messages. If an exception occurs or no messages are returned, the process sleeps and retries after a certain amount of time.

Here is my implementation :

import logging
import time
import os

from google.cloud import pubsub_v1


# PubSub service config
PROJECT_ID
= os.getenv('PROJ_ID', None)
SUBSCRIPTION
= os.getenv('PS_SUBS', None)
MESSAGES_COUNT
= int(os.getenv('MSGCOUNT', 0))

sleep_time
= 8

msg_ids
= []

def Process():
   
global sleep_time, msg_ids
    sleep_time
= min(sleep_time, 256)
   
   
if msg_ids is not None and len(msg_ids) > 1000:
        msg_ids
.clear()

   
# try catch in case of exception: 504 Deadline Exceeded (no messages to pull)
   
try:
        subscriber
= pubsub_v1.SubscriberClient()
        subscription_path
= subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION)
        response
= subscriber.pull(subscription_path, max_messages=MESSAGES_COUNT)
   
except Exception as e:
        logging
.warning(e)
        sleep_time
*= 2
       
return False

    msg_ids
= []

    pubsub_messages
= response.received_messages
   
   
if len(pubsub_messages) == 0:
        logging
.warning('Nothing pulled form pubsub')
        sleep_time
*= 2
       
return False

   
else:
        logging
.info('Pulled %s messages from pubsub' % str(len(pubsub_messages)))

       
for msg in pubsub_messages:
           
if msg.ack_id in msg_ids:
               
continue
           
            """
            messages processed here
            """

            msg_ids.append(msg.ack_id)
           
        subscriber
.acknowledge(subscription_path, msg_ids)
        sleep_time
= 8
       
return True


if __name__ == "__main__":
    sleep_time
= 8
   
while True:
       
if not Process():
            time
.sleep(sleep_time)

When there was no messages, I was getting this log : retry.retry_target: Retrying due to 504 Deadline Exceeded, sleeping ... my guess was that it is coming from an internal retry process when the pull fails to retrieve messages from a topic. Actually the process freezes totally on the pull function call till I finally get an exception : main.PublishLogs: Deadline of 600.0s exceeded while calling functools.partial(<function _wrap_unary_errors.<locals>.error_remapped_callable at 0x7fba49fa8ae8>, subscription: "XXXX" max_messages: 1000 , metadata=[('x-goog-api-client', 'gl-python/3.7.2 grpc/1.18.0 gax/1.7.0 gapic/0.39.1')]), last exception: 504 Deadline Exceeded

So I went back to the docs (here) and figured out that maybe I could force the client not to retry. According to the function definition we should be able to provide or not a retry object. And if none is supplied then the request will not be retried:

pull(subscription, max_messages, return_immediately=None, retry=<object object>, timeout=<object object>, metadata=None)
Pulls messages from the server. The server may return UNAVAILABLE if there are too many concurrent pull requests pending for the given subscription.

# Parameters:   
- subscription (str) – The subscription from which messages should be pulled. Format is projects/{project}/subscriptions/{sub}.
- max_messages (int) – The maximum number of messages returned for this request. The Pub/Sub system may return fewer than the number specified.
- return_immediately (bool) – If this field set to true, the system will respond immediately even if it there are no messages available to return in the Pull response. Otherwise, the system may wait (for a bounded amount of time) until at least one message is available, rather than returning no messages.
- retry (Optional[google.api_core.retry.Retry]) – A retry object used to retry requests. If None is specified, requests will not be retried.
- timeout (Optional[float]) – The amount of time, in seconds, to wait for the request to complete. Note that if retry is specified, the timeout applies to each individual attempt.
- metadata (Optional[Sequence[Tuple[str, str]]]) – Additional metadata that is provided to the method.

So I went back telling my self, yeah who knows, maybe its not None by default :

subscriber = pubsub_v1.SubscriberClient(retry=None)

This time it doesn't just timeout and retry till it freezes, instead it catches this exception : main.PublishLogs: __init__() got an unexpected keyword argument 'retry'

So far .. I don't know what to do.

FYI : python 3.7
requirements.txt : google-cloud-pubsub (not locking a version)


Thanks in advance for your help.

Tianzi Cai

unread,
Jan 28, 2019, 6:39:37 PM1/28/19
to Google Cloud Pub/Sub Discussions
Hi Ridha, how long is your ack deadline for messages in this subscription? 

Ridha El Bekri

unread,
Jan 29, 2019, 4:12:44 AM1/29/19
to Google Cloud Pub/Sub Discussions
Hi Tianzi, it's actually left to the default value which is 10 seconds

Efim (Cloud Platform Support)

unread,
Feb 20, 2019, 7:22:08 PM2/20/19
to Google Cloud Pub/Sub Discussions

Hello Ridha,


The documentation you quoted was for the pull method of the SubscriberClient object, not for the client’s constructor. The retry argument should be used in the method calls such as in the method subscriber.pull, line 28, rather than in the constructor on the line 26. This should remove the error about the unexpected keyword argument.
Reply all
Reply to author
Forward
0 new messages