val serverInterceptor = MonitoringServerInterceptor.create(Configuration.allMetrics())val realtimeServiceWithMonitoring = ServerInterceptors.intercept(
RealtimePublishGrpc.bindService(realtimeService, ExecutionContext.global),
serverInterceptor)
val rppServiceWithMonitoring = ServerInterceptors.intercept(
RealtimeProxyGrpc.bindService(realtimePublishProxyService, ExecutionContext.global),
serverInterceptor
)
val keyManagerFactory = GrpcSSLHelper.getKeyManagerFactory(sslConfig)
val trustManagerFactory = GrpcSSLHelper.getTrustManagerFactory(sslConfig)
val serverGrpcSslContexts = GrpcSSLHelper.getServerSslContext(keyManagerFactory, trustManagerFactory)
NettyServerBuilder
.forPort(8086)
.sslContext(serverGrpcSslContexts)
.addService(realtimeServiceWithMonitoring)
.addService(rppServiceWithMonitoring)
.build()
}
private val interceptor = MonitoringClientInterceptor.create(Configuration.allMetrics())val trustManagerFactory = GrpcSSLHelper.getTrustManagerFactory(sslConfig)
NettyChannelBuilder
.forAddress(address, 8086)
.intercept(interceptor)
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSSLHelper.getClientSslContext(keyManagerFactory, trustManagerFactory))
.build()
val realtimeServiceWithMonitoring =
ServerInterceptors.intercept(
RealtimePublishGrpc.bindService(realtimeService, ExecutionContext.global),
serverInterceptor)
val rppServiceWithMonitoring = ServerInterceptors.intercept(
RealtimeProxyGrpc.bindService(realtimePublishProxyService, ExecutionContext.global),
serverInterceptor
)
NettyServerBuilder
.forPort(8086)
.sslContext(serverGrpcSslContexts)
.addService(realtimeServiceWithMonitoring)
.addService(batchPublishWithMonitoring)
.addService(rppServiceWithMonitoring)
.executor(ForkJoinPool.commonPool())
.build()
override def publish(request: PublishRequest): Future[PublishResponse] = {
logger.debug("Received Publish request: " + request)
Future.successful(PublishResponse())
}
private def channelBuilder(address: String): ManagedChannel = {
val interceptor = MonitoringClientInterceptor.create(Configuration.allMetrics())
val builder = NettyChannelBuilder
.forAddress(address, realtimePublishProxyConfig.realtimeInstanceConfig.realtimeServicePort)
.intercept(interceptor)
.executor(ForkJoinPool.commonPool())
.eventLoopGroup(nioEventLoopGroup)
clientSslContext match {
case Some(sslContext) => builder.sslContext(sslContext).useTransportSecurity()
case None => builder.usePlaintext(true)
}
builder.build()
}