Potential performance issue when intercepting messages

111 views
Skip to first unread message

Dario Heinisch

unread,
Sep 30, 2022, 3:56:27 PM9/30/22
to phoenix-core
Hi there, 

I have been running into an issue which might lead to a feature proposal or clarification for me. 

I have around ~2K connected clients on around 100 different channels (not all are connected to the same channel). One channel in particular has around 1K to 1.5K connections active.

The pipeline is as following for all channels: kafka consumer -> publish on a channel

Now all channels are pretty much not showing offset issues and are processing their data relatively fast, only the kafka consumer which is broadcasting on the channel with the 1K-1.5K clients is having issues and falls often behind resulting in a huge consumer lag. It gets around 500-2000 messages per second. 

The only difference that this channel has vs the others is that it intercepts the outgoing messages. The reason for that is, that users have custom filters and messages are only being sent to them if the filters match but the interception does not transform the messages. 

I looked into the phoenix code and I noticed this peace of code: https://github.com/phoenixframework/phoenix/blob/v1.6.13/lib/phoenix/channel/server.ex#L85

So if I read that right, then if I intercept a message, then the message is being sent to the subscriber process(L93-96) where the `handle_out` method is then being called which then pushes it to the socket (if the handle_out calls `push(socket, ...)`. But this means if I intercept a message for a channel where I have 1K clients, that each of those clients is pushing to their single socket. As a result I will serialize the same message 1K times because I then call Channel.push -  https://github.com/phoenixframework/phoenix/blob/v1.6.13/lib/phoenix/channel.ex#L583 - which then calls Server.push - https://github.com/phoenixframework/phoenix/blob/v1.6.13/lib/phoenix/channel/server.ex#L240 - which serializes the message. 

I am wondering if that is the cause of the slowness. 

So I only intercept messages with the goal to filter them out not to modify them, is there maybe a better view or is this not yet possible and up for discussion? 

The slowness I am seeing is pretty significant so I am unsure what else would be an option to fix/debug it where it is coming from. Removing any broadcast calls in the kafka consumer immediately clears the offset lag. 

Hope my explanations & issues make sense, if anything is unclear, please let me know!

Hope to hear back & happy day,
Dario 

Chris McCord

unread,
Sep 30, 2022, 4:19:12 PM9/30/22
to phoenix-core
Yes your understanding of intercept is correct, as well call out in the docs:

>   *Note*: intercepting events can introduce significantly more overhead if a
  large number of subscribers must customize a message since the broadcast will
  be encoded N times instead of a single shared encoding across all subscribers.

If I understand correctly 2000 messages a second to the Kafka consumer is being rebroadcast to 1k clients so were talking 2M messages/second? It sounds like you need to rethink your forwarding logic, but it's not clear exactly what you're doing. For example if you are sending 500-2k messages/s down the pipe to end-users, it's likely too much for an interface to handle reasonable and you may need to debounce or collect the messages and publish a bulk summary. 

Dario Heinisch

unread,
Oct 2, 2022, 8:30:33 AM10/2/22
to phoenix-core
Hi Chris, 

Ahhh, somehow missed that parts in the docs, glad it is in there! :)

The amount of messages that go out are significant lower, I would need to check for the exact numbers since lots are being filtered out but yeah
definitelty have to add some scaling support nonetheless but that should rather be "easy" with phoenix/elixir.

Back to the intercepting: I feel like the dispatch function could be extend to better support the case where one would like to only filter based on each socket
and phoenix could still support, for instance like this:

  def dispatch(subscribers, from, %Broadcast{event: event} = msg) do
    Enum.reduce(subscribers, %{}, fn
      {pid, _}, cache when pid == from ->
        cache

      {pid, {:fastlane, fastlane_pid, serializer, event_intercepts}}, cache ->
        cond do
          event in event_intercepts ->
            send(pid, msg)
            cache
         
          event in event_filtering ->
            send? = GenServer.call(pid, {:filter, msg})
            if send? do
              case cache do
                %{^serializer => encoded_msg} ->
                  send(fastlane_pid, encoded_msg)
                  cache
   
                %{} ->
                  encoded_msg = serializer.fastlane!(msg)
                  send(fastlane_pid, encoded_msg)
                  Map.put(cache, serializer, encoded_msg)
              end
            else
              cache
            end
         
          _ ->
            case cache do
              %{^serializer => encoded_msg} ->
                send(fastlane_pid, encoded_msg)
                cache
 
              %{} ->
                encoded_msg = serializer.fastlane!(msg)
                send(fastlane_pid, encoded_msg)
                Map.put(cache, serializer, encoded_msg)
            end
        end
       

      {pid, _}, cache ->
        send(pid, msg)
        cache
    end)

    :ok
  end

So instead of sending the message to each connected channel we add a more fine granular support for the case
when one only wants to filter messages based on some logic of the user which is connected. That way we would not
serialize a message multiple times if the message content stays the same. 

Let me know what you think, I would happily start working on it if you have no concerns.

Best regards,
Dario

tboly kyle

unread,
Dec 29, 2022, 3:24:38 AM12/29/22
to phoenix-core

CONTACT DETAILS

WICKR APP......infobudsman

PHONE CALL.....+1(669)2570640

SIGNAL.....+1(669)2570640

EMAIL.....benrodger83@gmail.com

i have ketamine vials pharmacy grade for both human and animals,ketamine crystals pure cooked out of the original vials(uncut)i do supply on daily basis and i accept bulk and smaller orders.i have my feedback and i provide dated pictures and video of stock

please i am not interested in replying time waster as i got no time for them..

i do other product like weed,painpills..i ship and deliver worldwide securely

interested client ready to do real and big business should not hesitate to massage me using any of my contact details bellow i am online 24hours to take orders


CONTACT DETAILS

WICKR APP......infobudsman

PHONE CALL.....+1(669)2570640

SIGNAL.....+1(669)2570640

EMAIL.....benrodger83@gmail.com
Reply all
Reply to author
Forward
0 new messages