@Singleton
class ClusterActorSystemProvider @Inject()(configuration: Configuration, applicationLifecycle: ApplicationLifecycle) extends Provider[ActorSystem] {
private val logger = Logger(this.getClass)
override lazy val get: ActorSystem = {
val akkaSystem = ActorSystem("cluster", configuration.getConfig("cluster").get.underlying)
logger.info(s"Starting Akka Cluster System")
applicationLifecycle.addStopHook(() => akkaSystem.terminate())
akkaSystem
}
}
class ClusterActorRefProvider[T <: Actor: ClassTag](name: String, props: Props => Props) extends Provider[ActorRef] {
//noinspection VarCouldBeVal
@Inject
@Named("cluster")
private var actorSystem: ActorSystem = _
//noinspection VarCouldBeVal
@Inject
private var injector: Injector = _
lazy val get = {
val creation = Props(injector.instanceOf[T])
actorSystem.actorOf(props(creation), name)
}
}
Guice Module:
def bindGuiceClusterActor[T <: Actor : ClassTag](name: String, props: Props => Props = identity): Unit = {
bind(classOf[ActorRef])
.annotatedWith(Names.named(name))
.toProvider(Providers.guicify(new ClusterActorRefProvider(name, identity)))
.asEagerSingleton()
}
override def configure(): Unit = {
bind(classOf[ActorSystem]).annotatedWith(Names.named("cluster")).toProvider(classOf[ClusterActorSystemProvider])
bindGuiceClusterActor[ClusterActor]("cluster-actor")
}
@Override
protected void configure() {
...
bindActor(ActorA.class, "actorAName");
...
}
@Override
public void preStart() {
self().tell(INIT, ActorRef.noSender());
}
@Override
public void onReceive(Object message) throws Exception {
if (INIT.equals(message)) {
context().system().actorOf(ActorB.props(gpsService, jpaService), configuration.getString(CONF_KEY));
} else
unhandled(message);
}
class KafkaExample(actorSystem: ActorSystem) {
implicit val materializer = ActorMaterializer()(actorSystem)
val kafkaSettings = akka.kafka.ConsumerSettings[String, String](
actorSystem,
new StringDeserializer,
new StringDeserializer,
Set("topic-test")
).withBootstrapServers("localhost:9092")
//This Scheluder does not restart at Dev reload!
actorSystem.scheduler.scheduleOnce(3.seconds) {
play.Logger.info("Starting reactive-kafka example.")
//Consumer 1
Consumer.plainSource(
kafkaSettings
.withGroupId("consumer1")
.withClientId("consumer1")
).runForeach { record =>
println(s"Reader 1: «${record.toString}»")
}
//Consumer 2
Consumer.plainSource(
kafkaSettings
.withGroupId("consumer2")
.withClientId("consumer2")
).runForeach { record =>
println(s"Reader 2: «${record.toString}»")
}
}
}
Has anyone else experienced this?
Has anyone else experienced this?You need to cleanup afterwards.
life.addStopHook { () => |
println("Stopping ClusterService..") |
Future.successful(Unit) |
--
You received this message because you are subscribed to the Google Groups "play-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to play-framewor...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/play-framework/e2543bcc-2eb6-4464-a2f4-de3dbc117b61%40googlegroups.com.
--
You received this message because you are subscribed to a topic in the Google Groups "play-framework" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/play-framework/1hHsCr_3y2U/unsubscribe.
To unsubscribe from this group and all its topics, send an email to play-framewor...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/play-framework/e2543bcc-2eb6-4464-a2f4-de3dbc117b61%40googlegroups.com.
--
You received this message because you are subscribed to a topic in the Google Groups "play-framework" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/play-framework/1hHsCr_3y2U/unsubscribe.
To unsubscribe from this group and all its topics, send an email to play-framewor...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/play-framework/CAJmgB627eckwUmjoEo1RnrqFS4xg5rpZv9qN%2BgXs9PH_VtUYMg%40mail.gmail.com.
It's not even particularly specific to Akka - any eagerly injected service that tried to bind to a specific port would probably fail in the same way as it found the old version was still holding on to the port.