Maintaining state in a dependency

134 views
Skip to first unread message

Raymond A. Botha

unread,
Mar 10, 2018, 11:26:27 AM3/10/18
to nameko-dev
Hi,

I'm using the Google PubSub client to publish messages in a nameko service supposed to handle a few hundred requests per second, using a custom entrypoint to forward incoming messages from a websockets connection.

Unfortunately the PubSub client doesn't do well with being started up for each request, it needs to be able to batch requests and basically persist its datastore across workers.

Is it possible to have a dependency provider spawn a thread for the client, or some other way to avoid reauthentication and losing the client batching and state across requests?

Raymond A. Botha

unread,
Mar 10, 2018, 11:38:55 AM3/10/18
to nameko-dev
Using setup() in the dependency provider seems to be the answer here.

However, this hasn't seemed to fix my errors:
google.auth.exceptions.TransportError: HTTPSConnectionPool(host='accounts.google.com', port=443): Max retries exceeded with url: /o/oauth2/token (Caused by SSLError(SSLError("bad handshake: SysCallError(-1, 'Unexpected EOF')",),))


My dependency provider:
from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account


class EventPublisher(PublisherClient):
def __init__(self, topic, *args, **kwargs):
self._topic = topic
super().__init__(*args, **kwargs)

def publish(self, data, **kwargs):
return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)


class PubSub(DependencyProvider):
def __init__(self, **options):
self.options = options

def setup(self):
config = self.container.config["PUBSUB"]
project = config["PROJECT"]
if self.topic is None:
self.topic = config["TOPIC"]
self.topic_path = f"projects/{project}/topics/{self.topic}"
self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])
self.client = EventPublisher(self.topic_path, credentials=self.credentials)

def get_dependency(self, worker_ctx):
return self.client

The pubsub client has no problems publishing with the same configuration, in the same virtual environment, outside of nameko.

Matt Yule-Bennett

unread,
Mar 10, 2018, 5:17:58 PM3/10/18
to nameko-dev

Is it possible to have a dependency provider spawn a thread for the client, or some other way to avoid reauthentication and losing the client batching and state across requests?

Yes. Use self.container.spawn_managed_thread. For an example, check the nameko-slack dependency. The class using spawn_managed_thread is an Extension not a DependencyProvider, but the usage is the same.


On Saturday, March 10, 2018 at 4:38:55 PM UTC, Raymond A. Botha wrote:
Using setup() in the dependency provider seems to be the answer here.

However, this hasn't seemed to fix my errors:
google.auth.exceptions.TransportError: HTTPSConnectionPool(host='accounts.google.com', port=443): Max retries exceeded with url: /o/oauth2/token (Caused by SSLError(SSLError("bad handshake: SysCallError(-1, 'Unexpected EOF')",),))


setup() is called once when the service container starts, so this is the correct place to instantiate something that will be shared by all workers. Whatever is returned from get_dependency must be thread-safe though. My guess is the Google client object is not, which is why it's complaining about an SSL handshake failing.

If the client is not thread-safe, options are to funnel all the actions through a single thread (nameko_sentry used to do this) or you could try this convenience wrapper that serializes method calls on an object that I've been experimenting with for situations like this. 

Raymond A. Botha

unread,
Mar 10, 2018, 5:36:12 PM3/10/18
to nameko-dev
Thanks Matt, it must be a threading issue. I gave your convenience wrapper a shot like below:
from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account
from streaming.concurrency import ThreadSafeWrapper



class EventPublisher(PublisherClient):
    def __init__(self, topic, *args, **kwargs):
        self._topic = topic
        super().__init__(*args, **kwargs)

    def publish(self, data, **kwargs):
        return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)


class PubSub(DependencyProvider):
    def __init__(self, topic=None, **options):
        self.topic = topic

        self.options = options

    def setup(self):
        config = self.container.config["PUBSUB"]
        project = config["PROJECT"]
        if self.topic is None:
            self.topic = config["TOPIC"]
        self.topic_path = f"projects/{project}/topics/{self.topic}"
        self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])
        self.client = ThreadSafeWrapper(EventPublisher(self.topic_path, credentials=self.credentials))


    def get_dependency(self, worker_ctx):
        return self.client

However, this gave me the same error, full trace here: https://gist.github.com/raybotha/f5de2f07386f0ca845a6d6d201b070c8

Raymond A. Botha

unread,
Mar 10, 2018, 6:02:16 PM3/10/18
to nameko-dev
Also the same issue with my attempt at the queue you used in nameko-sentry:

from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account
from eventlet.queue import Queue



class EventPublisher(PublisherClient):
    def __init__(self, topic, *args, **kwargs):
        self._topic = topic
        super().__init__(*args, **kwargs)

    def publish(self, data, **kwargs):
        return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)


class ThreadSafeClient:
    def __init__(self):
        self.queue = Queue()

    def publish(self, message):
        self.queue.put(message)



class PubSub(DependencyProvider):
    def __init__(self, topic=None, **options):
        self.topic = topic
        self.options = options

    def _run(self):
        while True:
            item = self.safe_client.queue.get()
            if item is None:
                break

            self.client.publish(item)
            del item

    def start(self):
        self._gt = self.container.spawn_managed_thread(
            self._run)

    def stop(self):
        self.safe_client.queue.put(None)

        if self._gt is not None:
            self._gt.wait()


    def setup(self):
        config = self.container.config["PUBSUB"]
        project = config["PROJECT"]
        if self.topic is None:
            self.topic = config["TOPIC"]
        self.topic_path = f"projects/{project}/topics/{self.topic}"
        self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])

        self.safe_client = ThreadSafeClient()

        self.client = EventPublisher(self.topic_path, credentials=self.credentials)

    def get_dependency(self, worker_ctx):
        return self.safe_client


Matt Yule-Bennett

unread,
Mar 12, 2018, 4:00:01 AM3/12/18
to nameko-dev
The convenience wrapper will only work if the object is safe between methods, i.e. shared state is not manipulated by one method and then read by another. I guess that's not the case here. It's more useful when you have some control over the thing you're wrapping.

The approach used by nameko-sentry should work though. If it's implemented correctly there can only be one thread interacting the with client object -- the one spawned to run self._run. My only suggestion would be to instantiate the client in that thread too. According to https://developers.google.com/api-client-library/python/guide/thread_safety the credentials object is thread-safe, so you should be fine with a shared instance of that.

Raymond A. Botha

unread,
Mar 12, 2018, 11:00:37 AM3/12/18
to nameko-dev
Thanks Matt, I've tried to isolate the client to its own thread as much as I can think of doing, and as you've suggested, without luck. This is the provider code now: https://gist.github.com/raybotha/362b661b02c5b95cdfac5bca10fbc0c9
Strangely, the same SSL threading issue is still occurring, most likely due to the Google client auth using httplib 2 (https://github.com/google/oauth2client/blob/3071457064f3705bab1b041bd624a10d5a2d2619/oauth2client/transport.py#L17)
This issue is discussed here, but without much context or relation to the PubSub usage - I'll have to give it some hacking or something.

Matt Yule-Bennett

unread,
Mar 13, 2018, 11:09:06 AM3/13/18
to nameko-dev
I don't think it can be a threading issue anymore. Your DependencyProvider is serialising all the publishes into a single thread and looks correct.

You did need to make your DP thread-safe, but I no longer think that is the root cause of your problem.

Have you read through https://github.com/requests/requests/issues/2022? I know you're not using requests but there's a lot of good info in there about OpenSSL and possible handshake failures.

One experiment to try would be using the client outside of Nameko but with the Eventlet monkey patch applied. It may be that there's a bug in the green implementation of OpenSSL.

Raymond A. Botha

unread,
Mar 13, 2018, 11:56:22 AM3/13/18
to nameko-dev
Just applying a simple monkey patch to the regular test code isn't causing any trouble: https://gist.github.com/raybotha/2d9d1119f1436ff34071d4ce3a041f51

Interestingly, it's also working with this greenthread implementation accessing the client in the main thread: https://gist.github.com/raybotha/c7f193fce0a2c06418f03defe4a33687

I'll take a look at some OpenSSL setups, this has to be somehow related to how httplib2 is using openssl.

Raymond A. Botha

unread,
Mar 13, 2018, 12:01:00 PM3/13/18
to nameko-dev
Spawning 100 greenthreads publishing messages are also not causing trouble: https://gist.github.com/raybotha/d4df2d69c12d86ce357a6bd0785cdbdc

Matt Yule-Bennett

unread,
Mar 14, 2018, 12:16:42 PM3/14/18
to nameko-dev
You have to apply the monkey patch **before** imports, otherwise it doesn't take effect

Raymond Botha

unread,
Mar 14, 2018, 1:05:30 PM3/14/18
to nameko-dev
Ah yes thanks, that broke it with the same error.

Hmm it seems like my options are to raise an issue in the google auth repo, and possibly attempt a fork of it with a different http library.

Is it possible to run green threads alongside kernel threads? Or is it not possible to run non-eventlet compatible code in a nameko service?

Matt Yule-Bennett

unread,
Mar 15, 2018, 6:06:06 AM3/15/18
to nameko-dev
I think this should probably be considered a bug in Eventlet. Have you tried using different versions of Eventlet?

If it comes to it, it is possible to run native threads. See http://eventlet.net/doc/threading.html#tpool-simple-thread-pool

r...@invictuscapital.com

unread,
Apr 23, 2018, 3:15:37 PM4/23/18
to nameko-dev
Thanks for the help last month Matt, but I still haven't got this working, even with a tpool.
I opened an issue with eventlet a month ago though with a simple snippet to reproduce the issue (https://github.com/eventlet/eventlet/issues/476).
It seems gRPC has added support for gevent in the latest release, but eventlet is still incompatible.

Matt Yule-Bennett

unread,
Apr 24, 2018, 2:05:55 AM4/24/18
to nameko-dev
Damn. It's pretty unfortunate that the Google client libraries use a gRPC client under the hood.

You should be able to integrate it using the tpool though; wrapping eventlet-incompatible code is exactly its intended use-case. That will be a lot easier than getting things going with gevent ;)
Reply all
Reply to author
Forward
0 new messages