Akka Tcp peer to peer architecture instead of client server

157 views
Skip to first unread message

Chris Stewart

unread,
Jun 21, 2016, 4:36:36 AM6/21/16
to Akka User List
Hi all, 

I posted this question to stackoverflow and haven't had any luck with it being answered there, so I thought I would try here: 


In the current Akka documentation there is a [nice example of creating a client server architecture][1]. I'm creating a Akka actor that can send and receive messages on the bitcoin protocol. So far I've been able to send messages & receive replies to the message I sent, but I haven't been able to receive unsolicited messages as required on the peer to peer protocol. 

I've tried to use `Tcp.Bind` and `Tcp.Connect` to be able to listen to unsolicited messages on port `18333` whistle also being able to send messages to a peer on the network. However, I run into this issue where it will say that the port is already bound (by the `Tcp.Connect` event) or it won't be able to send messages from that port (due to the `Tcp.Bind` event). 

How can I send messages and receive unsolicited messages on the same port? Am I missing something here? 

    sealed trait Client extends Actor with BitcoinSLogger {
    
      /**
        * The address of the peer we are attempting to connect to
        * on the p2p network
        * @return
        */
      def remote: InetSocketAddress
    
      /**
        * The actor that is listening to all communications between the
        * client and its peer on the network
        * @return
        */
      def listener : ActorRef
    
    
      def actorSystem : ActorSystem
      /**
        * The manager is an actor that handles the underlying low level I/O resources (selectors, channels)
        * and instantiates workers for specific tasks, such as listening to incoming connections.
        */
      def manager : ActorRef = IO(Tcp)(actorSystem)
    
    
      /**
        * This actor signifies the node we are connected to on the p2p network
        * This is set when we received a [[Tcp.Connected]] message
        */
      private var peer : Option[ActorRef] = None
    
      def receive = {
        case message : Tcp.Message => message match {
          case event : Tcp.Event =>
            logger.debug("Event: " + event)
            handleEvent(event)
          case command : Tcp.Command =>
            logger.debug("Command: " + command)
            handleCommand(command)
        }
        case unknownMessage => throw new IllegalArgumentException("Unknown message for client: " + unknownMessage)
      }
    
      /**
        * This function is responsible for handling a [[Tcp.Event]] algebraic data type
        * @param event
        */
      private def handleEvent(event : Tcp.Event) = event match {
        case Tcp.Bound(localAddress) =>
          logger.debug("Actor is now bound to the local address: " + localAddress)
        case Tcp.CommandFailed(w: Tcp.Write) =>
          logger.debug("Client write command failed: " + Tcp.CommandFailed(w))
          logger.debug("O/S buffer was full")
          // O/S buffer was full
          //listener ! "write failed"
        case Tcp.CommandFailed(command) =>
          logger.debug("Client Command failed:" + command)
        case Tcp.Received(data) =>
          logger.debug("Received data from our peer on the network: " + BitcoinSUtil.encodeHex(data.toArray))
          //listener ! data
        case Tcp.Connected(remote, local) =>
          logger.debug("Tcp connection to: " + remote)
          logger.debug("Local: " + local)
          peer = Some(sender)
          peer.get ! Tcp.Register(listener)
          listener ! Tcp.Connected(remote,local)
        case Tcp.ConfirmedClosed =>
          logger.debug("Client received confirmed closed msg: " + Tcp.ConfirmedClosed)
          peer = None
          context stop self
      }
      /**
        * This function is responsible for handling a [[Tcp.Command]] algebraic data type
        * @param command
        */
      private def handleCommand(command : Tcp.Command) = command match {
        case Tcp.ConfirmedClose =>
          logger.debug("Client received connection closed msg: " + Tcp.ConfirmedClose)
          listener ! Tcp.ConfirmedClose
          peer.get ! Tcp.ConfirmedClose
      }
    
    }
    
    
    case class ClientImpl(remote: InetSocketAddress, network : NetworkParameters,
                          listener: ActorRef, actorSystem : ActorSystem) extends Client {
      manager ! Tcp.Bind(listener, new InetSocketAddress(remote.getPort))
    
      //this eagerly connects the client with our peer on the network as soon
      //as the case class is instantiated
      manager ! Tcp.Connect(remote, Some(new InetSocketAddress(remote.getPort)))
    
    }

Chris Stewart

unread,
Jun 29, 2016, 1:11:13 AM6/29/16
to Akka User List
It looks like akka is missing messages that are sent microseconds apart. I've added the output of wireshark while I am running this test - my machine receives the messages but akka does not register them. However, akka does seem to register messages that are received seconds later. Here is the SO post: https://stackoverflow.com/questions/37818347/akka-tcp-create-peer-to-peer-architecture-instead-of-client-server

and here is the github repo if you want to see the behavior yourself: 

Endre Varga

unread,
Jun 29, 2016, 5:35:48 AM6/29/16
to akka...@googlegroups.com
On Tue, Jun 21, 2016 at 12:41 AM, Chris Stewart <stewart....@gmail.com> wrote:
Hi all, 

I posted this question to stackoverflow and haven't had any luck with it being answered there, so I thought I would try here: 


In the current Akka documentation there is a [nice example of creating a client server architecture][1]. I'm creating a Akka actor that can send and receive messages on the bitcoin protocol. So far I've been able to send messages & receive replies to the message I sent, but I haven't been able to receive unsolicited messages as required on the peer to peer protocol. 

I've tried to use `Tcp.Bind` and `Tcp.Connect` to be able to listen to unsolicited messages on port `18333` whistle also being able to send messages to a peer on the network. However, I run into this issue where it will say that the port is already bound (by the `Tcp.Connect` event) or it won't be able to send messages from that port (due to the `Tcp.Bind` event). 

Sorry but I don't get it here. You bind to a port to get TCP connections. Once there is an incoming connection, you are notified of it and you can send messages over that connection. I don't see how you would be able to send to a random peer on the network without connecting either to it, or it connecting to you. You can do that with UDP, but not with TCP. There is no difference in Akka from any other Socket based technologies, we don't hide anything from the JDK network functionality.

I feel some kind of misunderstanding here. Can you please clarify?

-Endre
 

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Endre Varga

unread,
Jun 29, 2016, 5:39:59 AM6/29/16
to akka...@googlegroups.com
On Wed, Jun 29, 2016 at 2:25 AM, Chris Stewart <stewart....@gmail.com> wrote:
It looks like akka is missing messages that are sent microseconds apart. I

Can you please post a test reproducing this issue? Akka TCP exists for more than 3 years now, and Akka Stream and Akka HTTP are built on top if it and those exist also for almost 2 years now (not to mention Spray). I am hence skeptical that simply missing messages because of such a simple timing issue would exist in such an easily excercisable way. I cannot exclude the possibility of course, but I would like to see a reproducer first.

Maybe some subtle bug in your code? 

-Endre

 

--
Reply all
Reply to author
Forward
0 new messages