google.auth.exceptions.TransportError: HTTPSConnectionPool(host='accounts.google.com', port=443): Max retries exceeded with url: /o/oauth2/token (Caused by SSLError(SSLError("bad handshake: SysCallError(-1, 'Unexpected EOF')",),))
from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account
class EventPublisher(PublisherClient):
def __init__(self, topic, *args, **kwargs):
self._topic = topic
super().__init__(*args, **kwargs)
def publish(self, data, **kwargs):
return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)
class PubSub(DependencyProvider):
def __init__(self, **options):
self.options = options
def setup(self):
config = self.container.config["PUBSUB"]
project = config["PROJECT"]
if self.topic is None:
self.topic = config["TOPIC"]
self.topic_path = f"projects/{project}/topics/{self.topic}"
self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])
self.client = EventPublisher(self.topic_path, credentials=self.credentials)
def get_dependency(self, worker_ctx):
return self.client
Is it possible to have a dependency provider spawn a thread for the client, or some other way to avoid reauthentication and losing the client batching and state across requests?
Using setup() in the dependency provider seems to be the answer here.
However, this hasn't seemed to fix my errors:
google.auth.exceptions.TransportError: HTTPSConnectionPool(host='accounts.google.com', port=443): Max retries exceeded with url: /o/oauth2/token (Caused by SSLError(SSLError("bad handshake: SysCallError(-1, 'Unexpected EOF')",),))
from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account
from streaming.concurrency import ThreadSafeWrapper
class EventPublisher(PublisherClient):
def __init__(self, topic, *args, **kwargs):
self._topic = topic
super().__init__(*args, **kwargs)
def publish(self, data, **kwargs):
return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)
class PubSub(DependencyProvider):
def __init__(self, topic=None, **options):
self.topic = topic
self.options = options
def setup(self):
config = self.container.config["PUBSUB"]
project = config["PROJECT"]
if self.topic is None:
self.topic = config["TOPIC"]
self.topic_path = f"projects/{project}/topics/{self.topic}"
self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])
self.client = ThreadSafeWrapper(EventPublisher(self.topic_path, credentials=self.credentials))
def get_dependency(self, worker_ctx):
return self.client
from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from google.oauth2 import service_account
from eventlet.queue import Queue
class EventPublisher(PublisherClient):
def __init__(self, topic, *args, **kwargs):
self._topic = topic
super().__init__(*args, **kwargs)
def publish(self, data, **kwargs):
return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)
class ThreadSafeClient:
def __init__(self):
self.queue = Queue()
def publish(self, message):
self.queue.put(message)
class PubSub(DependencyProvider):
def __init__(self, topic=None, **options):
self.topic = topic
self.options = options
def _run(self):
while True:
item = self.safe_client.queue.get()
if item is None:
break
self.client.publish(item)
del item
def start(self):
self._gt = self.container.spawn_managed_thread(
self._run)
def stop(self):
self.safe_client.queue.put(None)
if self._gt is not None:
self._gt.wait()
def setup(self):
config = self.container.config["PUBSUB"]
project = config["PROJECT"]
if self.topic is None:
self.topic = config["TOPIC"]
self.topic_path = f"projects/{project}/topics/{self.topic}"
self.credentials = service_account.Credentials.from_service_account_file(config["CREDENTIALS"])
self.safe_client = ThreadSafeClient()
self.client = EventPublisher(self.topic_path, credentials=self.credentials)
def get_dependency(self, worker_ctx):
return self.safe_client