...answered my own question - looks like dependencies are injected into services, but not each other. Naively trying to use Publisher() in my Logger DP leads to:
`TypeError: bind() missing 1 required positional argument: 'attr_name'`
So would I be better off importing ampqlib or somesuch and using it directly in the DP?
import logging
from nameko.amqp.publish import Publisher
logger = logging.getLogger(__name__)
class PublisherHandler(logging.Handler): """ Handler for publishing trace messages to RabbitMQ """ def __init__( self, amqp_uri, exchange_name, routing_key, serializer=None, content_type=None ): self.publisher = Publisher( amqp_uri, exchange=exchange_name, routing_key=routing_key, serializer=serializer, content_type=content_type ) super(PublisherHandler, self).__init__()
def emit(self, log_record): try: self.publisher.publish(self.format(log_record)) except Exception: self.handleError(log_record)
LOGGING: formatters: tracer: (): yourib.logging.formatters.GELFFormatter handlers: tracer: formatter: tracer (): yourlib.logging.handlers.PublisherHandler amqp_uri: <GELF_AMQP_URI> exchange_name: <GELF_EXCHANGE_NAME> routing_key: <GELF_ROUTING_KEY> serializer: json content_type: application/json loggers: nameko_tracer: level: INFO handlers: - tracer
I forgot to respond a couple days ago; this helped a lot, thank you. We've got it working exactly how we want it now. I appreciate your input.
Well, this was all going well until it wasn't!I think I'm being naive when creating the kombu AMQP connection/queue in the logging dependency. Once we have several workers active, we start getting exceptions regarding multiple writes to a file descriptor (apologies, I'm paraphrasing).
The dependecy's creating a single queue in its setup() method, which I now realise is being shared by all workers.
I think the right thing to do is refactor this so that a queue connection is created for each worker. Perhaps set up the connection in worker_setup() and store it in a shared dictionary for inclusion in the object returned by get_dependency() and then cleaned-up in worker_result().
I'll see how that goes!