import logging
import time
import os
from google.cloud import pubsub_v1
# PubSub service config
PROJECT_ID = os.getenv('PROJ_ID', None)
SUBSCRIPTION = os.getenv('PS_SUBS', None)
MESSAGES_COUNT = int(os.getenv('MSGCOUNT', 0))
sleep_time = 8
msg_ids = []
def Process():
global sleep_time, msg_ids
sleep_time = min(sleep_time, 256)
if msg_ids is not None and len(msg_ids) > 1000:
msg_ids.clear()
# try catch in case of exception: 504 Deadline Exceeded (no messages to pull)
try:
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION)
response = subscriber.pull(subscription_path, max_messages=MESSAGES_COUNT)
except Exception as e:
logging.warning(e)
sleep_time *= 2
return False
msg_ids = []
pubsub_messages = response.received_messages
if len(pubsub_messages) == 0:
logging.warning('Nothing pulled form pubsub')
sleep_time *= 2
return False
else:
logging.info('Pulled %s messages from pubsub' % str(len(pubsub_messages)))
for msg in pubsub_messages:
if msg.ack_id
in msg_ids:
continue
"""
messages processed here
"""
msg_ids.append(
msg.ack_id)
subscriber.acknowledge(subscription_path, msg_ids)
sleep_time = 8
return True
if __name__ == "__main__":
sleep_time = 8
while True:
if not Process
():
time.sleep(sleep_time)
pull(subscription, max_messages, return_immediately=None, retry=<object object>, timeout=<object object>, metadata=None)
Pulls messages from the server. The server may return UNAVAILABLE if there are too many concurrent pull requests pending for the given subscription.
# Parameters:
- subscription (str) – The subscription from which messages should be pulled. Format is projects/{project}/subscriptions/{sub}.
- max_messages (int) – The maximum number of messages returned for this request. The Pub/Sub system may return fewer than the number specified.
- return_immediately (bool) – If this field set to true, the system will respond immediately even if it there are no messages available to return in the Pull response. Otherwise, the system may wait (for a bounded amount of time) until at least one message is available, rather than returning no messages.
- retry (Optional[google.api_core.retry.Retry]) – A retry object used to retry requests. If None is specified, requests will not be retried.
- timeout (Optional[float]) – The amount of time, in seconds, to wait for the request to complete. Note that if retry is specified, the timeout applies to each individual attempt.
- metadata (Optional[Sequence[Tuple[str, str]]]) – Additional metadata that is provided to the method.
subscriber = pubsub_v1.SubscriberClient(retry=None)
Hello Ridha,