Is there a way to make a Python HTTP to Pub/Sub frontend asynchronous yet also guarantee publish?

2,249 views
Skip to first unread message

George Lu

unread,
Oct 25, 2017, 2:01:04 PM10/25/17
to Google Cloud Pub/Sub Discussions
I'm trying to develop a PubSub frontend that only returns OK 200 once the message is successfully published into PubSub. What's the best way to accomplish this with the Python client library, yet also have the frontend be performant?

The Publisher guide mentions that "Asynchronous publishing allows for batching and higher thoroughput in your application." Is there a way to accomplish both asynchronous publishing yet also guaranteed publish with the Python client library?

(Or should I just learn golang or java and move over to use those client libraries instead?)

More details:

The python-docs-samples has an example with a frontend that returns OK 200 happens regardless if the publisher.publish() is successful because the publish() call returns a future


# [START index]


@app.route('/', methods=['GET', 'POST']) def index():
if request.method == 'GET':
return render_template('index.html', messages=MESSAGES)
data = request.form.get('payload', 'Example payload').encode('utf-8')
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(
current_app.config['PROJECT'],
current_app.config['PUBSUB_TOPIC'])
publisher.publish(topic_path, data=data)
return 'OK', 200
# [END index]



I'm currently using Python Flask and my code looks like the following in order to guarantee that it the message was published. But this defeats the purpose of being async. I'm trying different things with wrapping my Flask app in uwsgi or gunicorn.

Enter code here...
@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return 'OK', 200

    data = request.form.get('payload', 'Example payload').encode('utf-8')

    try:
        future = publish_client.publish(topic,data)        
        message_id=future.result()
        print('Published. Google Pubsub message ID: {}.'.format(message_id))
        return 'OK', 200
    except Exception as e:
        print("Exception that occurred:", e)
        raise
# [END index]



Reference issues:


Jordan (Cloud Platform Support)

unread,
Oct 25, 2017, 4:39:40 PM10/25/17
to Google Cloud Pub/Sub Discussions
To clarify, a 'frontend' is a frontend because it must accept and respond to client requests quickly (in order to scale quickly). By forcing your frontend to wait for a success from Pub/Sub before returning to your client, you are indeed removing all asynchronicity and hindering scalability. 

Therefore by having your frontend perform the async call to Pub/Sub then returning the future to your actual client in the response, you are allowing for your clients to benefit from the async functionality. Your clients can then decide weather they want to poll Pub/Sub for a success, and send a request to your frontend again with the future. Your frontend would then quickly check the status and return the results to the client. In this case, your frontend is simply a proxy used for added security (to keep all service accounts and keys safe on your frontend) and authentication (to prevent non-authorized clients from accessing your Pub/Sub). 

- Note that Google Groups is meant for general product discussions and not for technical support. If you require further technical support for performing async calls in Python to Pub/Sub it is recommended to post your full detailed question to Stack Overflow using the supported Cloud tags. 

George Lu

unread,
Oct 26, 2017, 7:12:43 PM10/26/17
to Google Cloud Pub/Sub Discussions
Hi Jordan, thanks for your response. Yes, I understand the importance of speed and scalability, but before scaling, I also need "at least once" guarantee.

I want to develop a frontend HTTP endpoint (with our domain name) where we can have the flexibility to tell potential customers something like, "Program your IoT device's firmware to POST to our frontend endpoint here, and as long as you get an OK 200 response, then the message is guaranteed to be in our system, if you don't get an OK 200, try reposting." 

I don't want our customers to need to handle the future (they shouldn't even know we are using Google Pub/Sub). And I also don't want to return an OK 200 to the customer's device without first receiving a confirmation that the message for sure got into Google's Pub/Sub system. (The reason we want to use Pub/Sub instead of rolling our own Kafka, RabbitMQ, or Celery is because we want a scalable, maintained, out-of-the-box product that ensures an "at-least-once" guarantee. But if my front end returns an OK 200 but we actually lost the message, then it kind of defeats the purpose.)

I am comfortable with scaling out as many frontend pods as needed, but I'm just trying to figure out the most efficient way to build that. Since I'm more familiar with Python over Golang and Java, I've been using the PubSub Python client library inside Flask, using uwsgi or gunicorn for a webserver (or something else, still deciding), then putting that behind nginx and having everything run in Kubernentes GKE. (I haven't done much concurrent, multi-threaded, or multi-processing programming before, hence me posting the question about async and sync and what the possibilities are).

I guess the ideal solution would be that I can still use async but somehow still guarantee that it got into PubSub before I return a 200 response, and my post was just trying to see if such a solution exists.

Kir Titievsky

unread,
Oct 27, 2017, 8:57:01 AM10/27/17
to George Lu, Google Cloud Pub/Sub Discussions
George,

Thanks for the explanation. You have three options that I see:

- use the “API client library” for Pub/Sub in a GAE instance. It does exactly what you want as it is a thin layer on top of the rest api. You will find a link to it on the Pub/Sub client libraries page. The downside is that you are now responsible for retrying retryable errors. While Pub/Sub is highly available you will occasionally have to retry a request before succeeding or because a particular publish request took a long time. Or you can pass that on to the customer. You will also take a performance and cost hit overall as well as not getting the same level of support (samples, docs, bug triage time) as you would with the standard client library. Not that the python client in particular is not thread safe. But all that said, it works just fine in your scenario.

- use the client library and block on the future in a GAE request. There is overhead in the request but retries are handles, samples are up to date, and we will fix bugs.

- use the standard client library in an environment happy with threads: GCE or GKE say. You can have a single pub/Sub client instance handle multiple requests, each blocking a response to the user on future resolution, if you wish, but concurrently. Because you have a persistent client, overhead does not matter and you probably get lower machine cost overall. It is not as simple as GAE depending on your background. So there is a bit more cost in upfront development.

Let me know what you decide.

k
--
You received this message because you are subscribed to the Google Groups "Google Cloud Pub/Sub Discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-pubsub-dis...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-pubsub-discuss/680df1b8-341e-44f8-8845-20b9ea7ac6c9%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--

Kir Titievsky | Product Manager | Google Cloud Pub/Sub

George Lu

unread,
Nov 14, 2017, 2:02:23 PM11/14/17
to Google Cloud Pub/Sub Discussions
Thank you Kir.
I ended up starting with option 2 (using the client library and blocking on the future in a GAE request) and then used that with option 3 with Gunicron's gthread worker. I put 1 worker with multiple threads in a Kubernetes pod. A Kubernentes service redirected traffic to my pods, and I put an HPA autoscaler on the pods to scale them as load increased.

My Flask app code was essentially unchanged from my original post. Gunicorn's gthread worker implements Python3's asyncio, which is also what the Python PubSub client's future implements, therefore between the following 2 lines of code where most of the IO waiting occurred for the PubSub future to return, the gthread worker handled other requests with other threads, and when PubSub did return, each thread picked up where it left off and knew which respective client to send the OK 200 to.

        future = publish_client.publish(topic,data)        
        message_id=future.result()


I had also tried using gunicorn with its gevent worker, but that worker did not work with grpc. Also for some reason, the pod didn't work well with more than 1 worker (threw some errors), so I stuck to using just 1 worker per container and 1 container per pod, with 16 or 32 threads per worker (I found that to be the most efficient messages-per-cpu/ram-used).

I need to do more end-to-end testing, but with this setup, I think I was able to move away from being inefficiently IO bound. Thanks for your help/suggestions
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-pubsub-discuss+unsub...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages