[erlang-questions] Message Loop in Gen_Server

161 views
Skip to first unread message

Lee Sylvester

unread,
Apr 17, 2013, 1:33:04 PM4/17/13
to erlang-questions@erlang.org Questions
Hey guys,

So, I've hit a "best practice" conundrum in OTP; I have a server utilising gen_server for a RabbitMQ consumer. In the init of that gen_server, I'm setting up a RabbitMQ connection, but I also need to start a loop. My guess was that I shouldn't call this before init exits, as I was passing the Connection and Channel objects to state for handling elsewhere. If I handle the loop in init, surely it will never return?

To simplify what I'm saying (as I'm confusing myself here), here's my code:

init([]) ->
{ok, Connection} = amqp_connection:start(#amqp_params_network{ host="localhost" }),
{ok, Channel} = amqp_connection:open_channel(Connection),
amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"user_msgs">>,
type = <<"direct">>}),
#'queue.declare_ok'{queue = Queue} =
amqp_channel:call(Channel, #'queue.declare'{exclusive = true}),
State = {Channel, Connection},
amqp_channel:call(Channel, #'queue.bind'{exchange = <<"user_msgs">>,
routing_key = term_to_binary(node(self())),
queue = Queue}),
amqp_channel:subscribe(Channel, #'basic.consume'{queue = Queue,
no_ack = true}, self()),
receive
#'basic.consume_ok'{} -> ok
end,
loop(Channel),
{ok, State}.

Now, if I don't put the loop in my init, then how can I be sure that the loop is called every time the gen_server restarts? Can someone please suggest the "right" way to call the loop in my gen_server?

Thanks loads,
Lee
_______________________________________________
erlang-questions mailing list
erlang-q...@erlang.org
http://erlang.org/mailman/listinfo/erlang-questions

Garrett Smith

unread,
Apr 17, 2013, 1:41:59 PM4/17/13
to Lee Sylvester, erlang-questions@erlang.org Questions
On Wed, Apr 17, 2013 at 12:33 PM, Lee Sylvester <lee.sy...@gmail.com> wrote:
> Hey guys,
>
> So, I've hit a "best practice" conundrum in OTP; I have a server utilising gen_server for a RabbitMQ consumer. In the init of that gen_server, I'm setting up a RabbitMQ connection, but I also need to start a loop. My guess was that I shouldn't call this before init exits, as I was passing the Connection and Channel objects to state for handling elsewhere. If I handle the loop in init, surely it will never return?
>
> To simplify what I'm saying (as I'm confusing myself here), here's my code:
>
> init([]) ->
> {ok, Connection} = amqp_connection:start(#amqp_params_network{ host="localhost" }),
> {ok, Channel} = amqp_connection:open_channel(Connection),
> amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"user_msgs">>,
> type = <<"direct">>}),
> #'queue.declare_ok'{queue = Queue} =
> amqp_channel:call(Channel, #'queue.declare'{exclusive = true}),
> State = {Channel, Connection},
> amqp_channel:call(Channel, #'queue.bind'{exchange = <<"user_msgs">>,
> routing_key = term_to_binary(node(self())),
> queue = Queue}),
> amqp_channel:subscribe(Channel, #'basic.consume'{queue = Queue,
> no_ack = true}, self()),
> receive
> #'basic.consume_ok'{} -> ok
> end,
> loop(Channel),
> {ok, State}.

Definitely not in init/1 -- that blocks the caller to start_link.

> Now, if I don't put the loop in my init, then how can I be sure that the loop is called every time the gen_server restarts? Can someone please suggest the "right" way to call the loop in my gen_server?

It's awkward, which is the point of e2_task -- see
http://e2project.org if you're curious -- but this is how:

init(Args) ->
% init here
{ok, State, 0}.

handle_info(timeout, State) ->
loop().

The 0 element in the init result will cause an immediate 'timeout'
message to be sent to the process, resulting in a handle_info/2
callback.

Garrett

John Kemp

unread,
Apr 17, 2013, 1:44:31 PM4/17/13
to Lee Sylvester, erlang-questions@erlang.org Questions
Perhaps something like this?:

init(...) ->
spawn_link(
module_name, loop, [options]
).

loop(...) ->
receive
exit -> ok ;
message ->
(your code here),
loop(...)
end.

You can then send your gen_server an exit message to make it stop whatever it's doing.

JohnK

Garrett Smith

unread,
Apr 17, 2013, 1:45:29 PM4/17/13
to Lee Sylvester, erlang-questions@erlang.org Questions
Incidentally, if this is a loop that runs for the life of the process,
it's easier to just run a function using proc_lib:

proc_lib:start_link(M, F, A)

See http://erlang.org/doc/man/proc_lib.html for details.

You can setup your rabbit client at the start of the loop and just
loop thereafter.

Lee Sylvester

unread,
Apr 17, 2013, 1:46:27 PM4/17/13
to Garrett Smith, erlang-questions@erlang.org Questions
Thank you. I've actually seen that done in the Erlang in Action book. Didn't think of it myself :-) You're a star!

Regards,
Lee

JD Bothma

unread,
Apr 17, 2013, 1:47:38 PM4/17/13
to Lee Sylvester, erlang-questions@erlang.org Questions
Can I ask what your never-exiting loop is doing?

what is the purpose of the gen_server if the process will forever loop in loop and not deal with gen_server messages?

Lee Sylvester

unread,
Apr 17, 2013, 1:57:35 PM4/17/13
to JD Bothma, erlang-questions@erlang.org Questions
It's checking for messages from RabbitMQ.

Cheers,
Lee

Garrett Smith

unread,
Apr 17, 2013, 2:01:00 PM4/17/13
to Lee Sylvester, erlang-questions@erlang.org Questions
If you're receiving messages in the loop, you're working too hard :)

Once you start the gen_server, any messages sent to that process (i.e.
anything that you'd receive in your loop) can be handled via
handle_info/2.

If you're polling something, see if you can configure that something
to send you a message directly instead. I know you can do this with
the rabbit client.

Lee Sylvester

unread,
Apr 17, 2013, 2:27:52 PM4/17/13
to Garrett Smith, erlang-questions@erlang.org Questions
Ahh, thank you. I have tried to find this using Google, but was unable to find an example. Any idea what kind of function signature I'd use?

Thanks,
Lee

Garrett Smith

unread,
Apr 17, 2013, 2:38:20 PM4/17/13
to Lee Sylvester, erlang-questions@erlang.org Questions
Something like this:

{ok, Connection} = amqp_connection:start(Params),
{ok, Channel} = amqp_connection:open_channel(Connection),
QDecl = #'queue.declare'{queue=Name, auto_delete=true, exclusive=true},
#'queue.declare_ok'{queue=Queue} = amqp_channel:call(Channel, QDecl),
QBind = #'queue.bind'{queue=Queue, exchange=Exchange, routing_key=Binding},
amqp_channel:call(Channel, QBind)

At this point, messages delivered to the queue should be sent to your
process (handle via handle_info/2) as this:

{#'basic.deliver'{exchange=Exchange, routing_key=Key}, Msg}

Rabbit plugins are implemented as Erlang apps, so I would imagine
there are some good examples out there to look at as well.

Garrett

Tim Watson

unread,
Apr 17, 2013, 3:05:01 PM4/17/13
to Lee Sylvester, erlang-questions@erlang.org Questions
Hi,

When you subscribe, channel messages will be sent to the supplied Pid whenever something happens, e.g., basic.deliver etc.

So you don't need to go into a loop, because you've passed self() which in init/1 is the server's process id. What you do instead is to code your handle_info/2 clauses to deal with the AMQP messages.

You might want to take a look at amqp_gen_consumer as an alternative to gen_server for this as well.

Cheers,
Tim

Lee Sylvester

unread,
Apr 17, 2013, 3:07:56 PM4/17/13
to Garrett Smith, erlang-questions@erlang.org Questions
Thanks, Garrett. That give me some direction.

Regards,
Lee

Lee Sylvester

unread,
Apr 17, 2013, 3:08:39 PM4/17/13
to Tim Watson, erlang-questions@erlang.org Questions
Thanks, Tim. I'll take a look. I've been searching for examples, but they're not that common. I'll have a go at searching on amqp_gen_consumer… I'm sure that'll throw something up :-)

Regards,
Lee

JD Bothma

unread,
Apr 17, 2013, 3:13:29 PM4/17/13
to Lee Sylvester, erlang-questions@erlang.org Questions
Cool.

Like a few replies have said now, handle_info in the gen_server callback module is where you can handle those.

If all you need is to handle messages from rabbitmq, you could get away with a plain process, perhaps being supervised.

In the long run it's worth the added benefits of proc_lib as Garret suggested, which you get for free in gen_server, but gen_server adds the call and cast options. If you don't need gen_server call and cast, it's worth looking into "OTP-compliant special processes" http://www.erlang.org/doc/design_principles/spec_proc.html - often you don't need a gen_server, or forging a gen_server into what you want isn't the cleanest solution.

The easiest way to get started with something good enough, though, is gen_server :)

Tim Watson

unread,
Apr 17, 2013, 3:50:11 PM4/17/13
to JD Bothma, erlang-questions@erlang.org Questions
On 17 Apr 2013, at 20:13, JD Bothma wrote:

The easiest way to get started with something good enough, though, is gen_server :)


That's usually true, but if you're consuming RabbitMQ messages then amqp_selective_consumer or amqp_gen_consumer would probably be the simplest choice. :)

Cheers,
Tim

Ali Sabil

unread,
Apr 18, 2013, 4:15:44 AM4/18/13
to Tim Watson, erlang-questions@erlang.org Questions
I case you need something that is OTP compliant while allowing you to drive the receive loop yourself, you can take a look at gen_process: https://github.com/asabil/gen_process which is a lower level alternative to gen_server and gen_fsm

Cheers,
Ali


Tim Watson

unread,
Apr 18, 2013, 4:42:26 AM4/18/13
to Ali Sabil, erlang-questions@erlang.org Questions
Just to clarify - if you're consuming RabbitMQ messages then amqp_selective_consumer or amqp_gen_consumer will be the simplest choice. All the other advice is fine and good, but the OP is clearly relatively new to Erlang/OTP and those simplifying behaviours are clearly more appropriate to his needs.

On 18 Apr 2013, at 09:15, Ali Sabil wrote:
I case you need something that is OTP compliant while allowing you to drive the receive loop yourself, you can take a look at gen_process: https://github.com/asabil/gen_process which is a lower level alternative to gen_server and gen_fsm


I've not seen this before - it sounds interesting so I'll take a look. :)

Cheers,
Tim

Ali Sabil

unread,
Apr 18, 2013, 5:09:34 AM4/18/13
to Tim Watson, erlang-questions@erlang.org Questions
I have been using it in production under heavy load for quite some time so it's very stable. I haven't had time to document it though, so patches are very welcome :)
Reply all
Reply to author
Forward
0 new messages