I posted this question to stackoverflow and haven't had any luck with it being answered there, so I thought I would try here:
In the current Akka documentation there is a [nice example of creating a client server architecture][1]. I'm creating a Akka actor that can send and receive messages on the bitcoin protocol. So far I've been able to send messages & receive replies to the message I sent, but I haven't been able to receive unsolicited messages as required on the peer to peer protocol.
I've tried to use `Tcp.Bind` and `Tcp.Connect` to be able to listen to unsolicited messages on port `18333` whistle also being able to send messages to a peer on the network. However, I run into this issue where it will say that the port is already bound (by the `Tcp.Connect` event) or it won't be able to send messages from that port (due to the `Tcp.Bind` event).
How can I send messages and receive unsolicited messages on the same port? Am I missing something here?
sealed trait Client extends Actor with BitcoinSLogger {
/**
* The address of the peer we are attempting to connect to
* on the p2p network
* @return
*/
def remote: InetSocketAddress
/**
* The actor that is listening to all communications between the
* client and its peer on the network
* @return
*/
def listener : ActorRef
def actorSystem : ActorSystem
/**
* The manager is an actor that handles the underlying low level I/O resources (selectors, channels)
* and instantiates workers for specific tasks, such as listening to incoming connections.
*/
def manager : ActorRef = IO(Tcp)(actorSystem)
/**
* This actor signifies the node we are connected to on the p2p network
* This is set when we received a [[Tcp.Connected]] message
*/
private var peer : Option[ActorRef] = None
def receive = {
case message : Tcp.Message => message match {
case event : Tcp.Event =>
logger.debug("Event: " + event)
handleEvent(event)
case command : Tcp.Command =>
logger.debug("Command: " + command)
handleCommand(command)
}
case unknownMessage => throw new IllegalArgumentException("Unknown message for client: " + unknownMessage)
}
/**
* This function is responsible for handling a [[Tcp.Event]] algebraic data type
* @param event
*/
private def handleEvent(event : Tcp.Event) = event match {
case Tcp.Bound(localAddress) =>
logger.debug("Actor is now bound to the local address: " + localAddress)
case Tcp.CommandFailed(w: Tcp.Write) =>
logger.debug("Client write command failed: " + Tcp.CommandFailed(w))
logger.debug("O/S buffer was full")
// O/S buffer was full
//listener ! "write failed"
case Tcp.CommandFailed(command) =>
logger.debug("Client Command failed:" + command)
case Tcp.Received(data) =>
logger.debug("Received data from our peer on the network: " + BitcoinSUtil.encodeHex(data.toArray))
//listener ! data
case Tcp.Connected(remote, local) =>
logger.debug("Tcp connection to: " + remote)
logger.debug("Local: " + local)
peer = Some(sender)
peer.get ! Tcp.Register(listener)
listener ! Tcp.Connected(remote,local)
case Tcp.ConfirmedClosed =>
logger.debug("Client received confirmed closed msg: " + Tcp.ConfirmedClosed)
peer = None
context stop self
}
/**
* This function is responsible for handling a [[Tcp.Command]] algebraic data type
* @param command
*/
private def handleCommand(command : Tcp.Command) = command match {
case Tcp.ConfirmedClose =>
logger.debug("Client received connection closed msg: " + Tcp.ConfirmedClose)
listener ! Tcp.ConfirmedClose
peer.get ! Tcp.ConfirmedClose
}
}
case class ClientImpl(remote: InetSocketAddress, network : NetworkParameters,
listener: ActorRef, actorSystem : ActorSystem) extends Client {
manager ! Tcp.Bind(listener, new InetSocketAddress(remote.getPort))
//this eagerly connects the client with our peer on the network as soon
//as the case class is instantiated
manager ! Tcp.Connect(remote, Some(new InetSocketAddress(remote.getPort)))
}