Calling an RPC service within a dependency

168 views
Skip to first unread message

Chris Platts

unread,
Jan 13, 2018, 2:24:35 PM1/13/18
to nameko-dev
Hi,

I have a LogService which receives log and exception info via RPC in the Graylog GELF format and then posts it to Graylog via AMQP.

Now, I'm trying to write a dependency for my other services.  The idea is that this dependency provides methods to log at various levels (e.g. log_info(msg), log_warn(msg) etc., and also collects exception info (via worker_result).  The dependency would then build an appropriate GELF object and send it to the log service via RPC.

I'm struggling making the call to the @rpc entrypoint from inside the dependency.  Any tips appreciated!

Thanks,
Chris

Matt Yule-Bennett

unread,
Jan 13, 2018, 3:13:23 PM1/13/18
to nameko-dev
Nice. I've been waiting for someone to write a GELF DependencyProvider :)

Is there a reason the DP has to make RPC calls to your logging service rather than just publishing AMQP messages directly?

You may also want to take a look at https://github.com/Overseas-Student-Living/nameko-tracer, which you could probably use by simply plugging in a GELF log handler.

Chris Platts

unread,
Jan 13, 2018, 3:48:48 PM1/13/18
to nameko-dev
Thanks, Matt -- I'll take a look through the tracer to see how it works.

I suppose there's no harm putting the GELF-over-AMQP stuff into the DP rather than a separate service.  Would I still be able to use nameko.messaging.Publisher inside a DP to handle the AMQP stuff, or would an AMQP client have to be handled in the DP's own code?

Thanks,
Chris

Chris Platts

unread,
Jan 13, 2018, 5:27:39 PM1/13/18
to nameko-dev
...answered my own question - looks like dependencies are injected into services, but not each other. Naively trying to use Publisher() in my Logger DP leads to:

`TypeError: bind() missing 1 required positional argument: 'attr_name'`

So would I be better off importing ampqlib or somesuch and using it directly in the DP?


Matt Yule-Bennett

unread,
Jan 14, 2018, 6:22:49 AM1/14/18
to nameko-dev
On Saturday, 13 January 2018 22:27:39 UTC, Chris Platts wrote:
...answered my own question - looks like dependencies are injected into services, but not each other. Naively trying to use Publisher() in my Logger DP leads to:

`TypeError: bind() missing 1 required positional argument: 'attr_name'`


Right, DependencyProviders are specifically about injecting dependencies into Nameko services -- you can't put a DP inside another DP.
 
So would I be better off importing ampqlib or somesuch and using it directly in the DP?


Actually if you look at the nameko.messaging.Publisher DP you'll see that (apart from inheriting from DependencyProvider and implementing get_dependency()) it doesn't do very much except create an instance of nameko.amqp.publish.Publisher -- this is a utility class that makes it easier to publish messages to RabbitMQ. I would use this, rather than amqplib or kombu directly.

When I directed you to nameko-tracer I forgot that the critical part of actually publishing messages to AMQP was spun into another (currently closed-source) library. If you choose to use the tracer, you'll want to plug the AMQP publishing part in as a Python logging handler.

Here's a skeleton implementation:

import logging

from nameko.amqp.publish import Publisher

logger = logging.getLogger(__name__)


class PublisherHandler(logging.Handler):
    """ Handler for publishing trace messages to RabbitMQ
    """
    def __init__(
        self, amqp_uri, exchange_name, routing_key,
        serializer=None, content_type=None
    ):
        self.publisher = Publisher(
            amqp_uri,
            exchange=exchange_name,
            routing_key=routing_key,
            serializer=serializer, 
            content_type=content_type
        )
        super(PublisherHandler, self).__init__()

    def emit(self, log_record):
        try:
            self.publisher.publish(self.format(log_record))
        except Exception:
            self.handleError(log_record)

Use by registering it as a handler using the normal logging configuration:

LOGGING:
  formatters:
    tracer:
      (): yourib.logging.formatters.GELFFormatter
  handlers:
    tracer:
      formatter: tracer
      (): yourlib.logging.handlers.PublisherHandler
      amqp_uri: <GELF_AMQP_URI>
      exchange_name: <GELF_EXCHANGE_NAME>
      routing_key: <GELF_ROUTING_KEY>
      serializer: json
      content_type: application/json
  loggers:
    nameko_tracer:
      level: INFO
      handlers:
        - tracer

I would personally do it this way, implementing the GELF format as a standard logging formatter too. If there's anything required by Graylog which is not already collected by the tracer, I'm sure a PR to nameko-tracer would be gratefully received. 

 

brooke...@gmail.com

unread,
Jan 15, 2018, 6:55:48 PM1/15/18
to nameko-dev
Hi, Matt

I'm in a similar situation where I'm trying to capture all logging output (including from nameko-tracer) and send it to a specific service, although this service writes it to a PostgreSQL db rather than sending it to Graylog.

Each of our databases is 'owned' by a single service, so in theory all logs would be inserted into the logging (service information) database through the logging (service information) service, because the service owns that data domain and handles all interactions with that database.

Is there a good way to configure the logging to send all logs to a queue that can be processed by a service? Is there a better approach?

(I'm currently working off the example you've provided for sending to Graylog, but I wondered if you had any opinions on sending traffic to a nameko service.)

Thank you for your help

Matt Yule-Bennett

unread,
Jan 16, 2018, 2:20:59 AM1/16/18
to nameko-dev
Hi Brooke,

I would recommend almost the same solution as suggested above. In your setup though, the AMQP publisher would publish messages to be consumed by your logging service, rather than a Graylog consumer. You can use the Consumer entrypoint in nameko.messaging to consume messages from an arbitrary queue, and then simply write them to your logging database. This would be asynchronous messaging rather than RPC, but I think in this logging scenario a reply is not desired. 

Incidentally, the above pattern is exactly what we did at Student.com for a while -- using a separate Nameko service to consume AMQP messages and write to a database (Elasticsearch in our case). We have since ditched the intermediate service in favour of writing to ES directly from the tracer. While I strongly advocate the one-database-per-service rule for domain data, I think logging is a separate concern and it's fine for the repository of those logs to be shared. This is just my opinion with no knowledge of what your logging / service-information service is actually doing though, so take it with a pinch of salt.

Hope that helps,
Matt.

Brooke Lynne Weaver Skousen

unread,
Jan 19, 2018, 2:31:54 PM1/19/18
to nameko-dev
I forgot to respond a couple days ago; this helped a lot, thank you. We've got it working exactly how we want it now. I appreciate your input.

Chris Platts

unread,
Jan 29, 2018, 6:10:16 AM1/29/18
to nameko-dev
On Friday, 19 January 2018 19:31:54 UTC, Brooke Lynne Weaver Skousen wrote:
I forgot to respond a couple days ago; this helped a lot, thank you. We've got it working exactly how we want it now. I appreciate your input.

...and I forgot too :)

Ended up using Publisher within a DP.  The DP does a handful of things:

* Builds GELF messages with a few standard extra properties -- e.g. _serviceName, _callID, _hostName, etc.
* Logs service lifecycle events via GELF-over-AMQP
* Provides log.info(), log.debug(), log.error(), log.warn(), log.fatal() functions to the service code.

However, I'm wondering if I should copy trace's approach and use Python's logging library.  That would give config-file based setup of targets and levels.  However, using Graylog's making me think that perhaps I shouldn't worry about such things -- I'm quite happy for all log data to always be sent.  

(we're using Nameko due to the architectural advantages, not because we handle hundreds of calls per second, so log data is a stream rather than a flood!)

Chris Platts

unread,
Feb 22, 2018, 3:40:46 PM2/22/18
to nameko-dev
Well, this was all going well until it wasn't!

I think I'm being naive when creating the kombu AMQP connection/queue in the logging dependency.  Once we have several workers active, we start getting exceptions regarding multiple writes to a file descriptor (apologies, I'm paraphrasing).  

The dependecy's creating a single queue in its setup() method, which I now realise is being shared by all workers.  I think the right thing to do is refactor this so that a queue connection is created for each worker.  Perhaps set up the connection in worker_setup() and store it in a shared dictionary for inclusion in the object returned by get_dependency() and then cleaned-up in worker_result().

I'll see how that goes!

Matt Yule-Bennett

unread,
Feb 25, 2018, 2:41:32 AM2/25/18
to nameko-dev


On Thursday, February 22, 2018 at 8:40:46 PM UTC, Chris Platts wrote:
Well, this was all going well until it wasn't!

I think I'm being naive when creating the kombu AMQP connection/queue in the logging dependency.  Once we have several workers active, we start getting exceptions regarding multiple writes to a file descriptor (apologies, I'm paraphrasing).  


"Second simultaneous write detected"?

This is Eventlet protecting you from multiple threads writing to the same file descriptor (probably a socket in this case) at the same time. As per http://nameko.readthedocs.io/en/stable/writing_extensions.html#concurrency-and-thread-safety, whatever is returned from get_dependency() needs to be safe for concurrent access by multiple worker threads.
 
The dependecy's creating a single queue in its setup() method, which I now realise is being shared by all workers.

In AMQP you publish to exchanges rather than queues. RabbitMQ has a shortcut that allows you to publish a message into a named queue, but really what it's doing is publishing to the default exchange, which is implicitly bound to every queue by their name.

So the shared object is not the queue, but perhaps the connection being used by your publisher.

What's weird is that the built-in Nameko Publisher and EventDispatcher use a publisher shared between worker threads and don't have any protection against multiple access. This is fine if the write to the socket is atomic and I've never seen the lack of protection cause a problem.

Are you sending particularly large payloads?
 
I think the right thing to do is refactor this so that a queue connection is created for each worker.  Perhaps set up the connection in worker_setup() and store it in a shared dictionary for inclusion in the object returned by get_dependency() and then cleaned-up in worker_result().


This would work, but it would be slow because you're creating a new connection every time you publish a message. If you're going to do that you may as well ignore worker_setup() and  worker_teardown() and just create the connection in get_dependency() or lazily when you publish. A more efficient way would be to project the publisher with a lock, to serialise the concurrent access.
 
I'll see how that goes! 

If you can post some code here and/or a stack trace we might be able to figure out what's going wrong. It may be a platform specific problem.
Reply all
Reply to author
Forward
0 new messages