issue with scala stream collector and kinesis

524 views
Skip to first unread message

Jeetu Choudhary

unread,
Mar 30, 2016, 8:40:33 AM3/30/16
to Snowplow
i am creating scala stream collector and it is showing me exception:

Exception in thread "main" com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'enabled'
    at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:115)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:136)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:150)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:155)
    at com.typesafe.config.impl.SimpleConfig.getBoolean(SimpleConfig.java:165)
    at com.snowplowanalytics.snowplow.collectors.scalastream.CollectorConfig.<init>(ScalaCollectorApp.scala:140)
    at com.snowplowanalytics.snowplow.collectors.scalastream.ScalaCollector$delayedInit$body.apply(ScalaCollectorApp.scala:73)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.App$$anonfun$main$1.apply(App.scala:71)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
    at scala.App$class.main(App.scala:71)
    at com.snowplowanalytics.snowplow.collectors.scalastream.ScalaCollector$.main(ScalaCollectorApp.scala:42)
    at com.snowplowanalytics.snowplow.collectors.scalastream.ScalaCollector.main(ScalaCollectorApp.scala)


My configuration file is:



collector {
 
  # The collector runs as a web service specified on the following   
 
  # interface and port.
 
  interface = "0.0.0.0"
 
  port = 80
 
 
 # Production mode disables additional services helpful for configuring and
 
  # initializing the collector, such as a path '/dump' to view all
 
  # records stored in the current stream.
 
  production = true
 
   
 
  # Configure the P3P policy header.
 
  p3p {
 
    policyref = "/w3c/p3p.xml"
 
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
 
  }
 
 
  # The collector returns a cookie to clients for user identification
 
  # with the following domain and expiration.
 
  cookie {
 
    # Set to 0 to disable the cookie
 
    expiration = 365 days
 
    # The domain is optional and will make the cookie accessible to other
 
    # applications on the domain. Comment out this line to tie cookies to
 
    # the collector's full domain
 
    # domain = ""
 
  }
 
 
  sink {
 
    # Sinks currently supported are:
 
    # 'kinesis' for writing Thrift-serialized records to a Kinesis stream
 
    # 'stdout' for writing Base64-encoded Thrift-serialized records to stdout
 
    #    Recommended settings for 'stdout' so each line printed to stdout
 
    #    is a serialized record are:
 
    #      1. Setting 'akka.loglevel = OFF' and 'akka.loggers = []'
 
    #         to disable all logging.
 
    #      2. Using 'sbt assembly' and 'java -jar ...' to disable
 
    #         sbt logging.
 
    enabled = "kinesis"   
 
 
    kinesis {
 
      thread-pool-size: 10 # Thread pool size for Kinesis API requests
 
 
      # The following are used to authenticate for the Amazon Kinesis sink.
 
      #
 
      # If both are set to 'cpf', a properties file on the classpath is used.
 
      # http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/ClasspathPropertiesFileCredentialsProvider
 
      #
 
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
 
      #
 
      # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
 
      aws {
 
        access-key: "***************"
 
        secret-key: "***********************"
 
      }
 
 
      # Data will be stored in the following stream.
 
      stream {
 
    region: "us-east-1"
 
        good: "collector.stream.good"
 
        bad: "collector.stream.bad"  
 
      }
 
 
      # Minimum and maximum backoff periods
 
      backoffPolicy: {
 
        minBackoff: 3000 # 3 seconds
 
        maxBackoff: 600000 # 5 minutes
 
      }
 
 
      # Incoming events are stored in a buffer before being sent to Kinesis.
 
      # The buffer is emptied whenever:
 
      # - the number of stored records reaches record-limit or
 
      # - the combined size of the stored records reaches byte-limit or
 
      # - the time in milliseconds since the buffer was last emptied reaches time-limit
 
      buffer {
 
        byte-limit: 500000
 
        record-limit: 1000
 
        time-limit: 10000 
 
      }
 
    }  
 
  }    
 
}      
 
       
 
# Akka has a variety of possible configuration options defined at
 
# http://doc.akka.io/docs/akka/2.2.3/general/configuration.html
 
akka {
 
  loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
 
  loggers = ["akka.event.slf4j.Slf4jLogger"]
 
}
 
 
 
# spray-can is the server the Stream collector uses and has configurable
 
# options defined at
 
# https://github.com/spray/spray/blob/master/spray-can/src/main/resources/reference.conf
 
spray.can.server {
 
  # To obtain the hostname in the collector, the 'remote-address' header
 
  # should be set. By default, this is disabled, and enabling it
 
  # adds the 'Remote-Address' header to every request automatically.
 
  remote-address-header = on
 
 
  uri-parsing-mode = relaxed
 
  raw-request-uri-header = on
 
 
  # Define the maximum request length (the default is 2048)
 
  parsing {
 
    max-uri-length = 32768
 
  }
 



Fred Blundun

unread,
Mar 30, 2016, 8:48:28 AM3/30/16
to snowpl...@googlegroups.com
Hi Jeetu,

As of R78, you need to explicitly enable cookies by setting `enabled = true` in the config file.

You can see an example configuration file here.

Regards,
Fred

--
You received this message because you are subscribed to the Google Groups "Snowplow" group.
To unsubscribe from this group and stop receiving emails from it, send an email to snowplow-use...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Jeetu Choudhary

unread,
Mar 30, 2016, 9:08:33 AM3/30/16
to Snowplow
Thanks Fred

Now it is showing a new error:

18:34:52.308 [scala-stream-collector-akka.actor.default-dispatcher-5] DEBUG spray.can.server.HttpListener - Binding to /0.0.0.0:80
18:34:52.471 [scala-stream-collector-akka.actor.default-dispatcher-2] DEBUG akka.io.TcpListener - Bind failed for TCP channel on endpoint [/0.0.0.0:80]: java.net.SocketException: Permission denied
18:34:52.479 [scala-stream-collector-akka.actor.default-dispatcher-2] WARN  spray.can.server.HttpListener - Bind to /0.0.0.0:80 failed
18:34:52.481 [scala-stream-collector-akka.actor.default-dispatcher-2] INFO  akka.actor.DeadLetterActorRef - Message [akka.io.Tcp$CommandFailed] from Actor[akka://scala-stream-collector/user/IO-HTTP/listener-0#-885706939] to Actor[akka://scala-stream-collector/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
18:34:52.482 [scala-stream-collector-akka.actor.default-dispatcher-2] DEBUG akka.io.TcpListener - Closing serverSocketChannel after being stopped
18:34:52.492 [scala-stream-collector-akka.actor.default-dispatcher-4] INFO  akka.actor.LocalActorRef - Message [akka.dispatch.sysmsg.DeathWatchNotification] from Actor[akka://scala-stream-collector/system/IO-TCP/selectors/$a/0#81402244] to Actor[akka://scala-stream-collector/system/IO-TCP/selectors/$a/0#81402244] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Fred Blundun

unread,
Mar 30, 2016, 9:14:07 AM3/30/16
to snowpl...@googlegroups.com
That error can mean that another process is already listening at port 80. You can find out its pid using a command like `sudo netstat -tulpn | grep :80`.

Alternatively try changing the port to something other than 80 (e.g. 8000) in your configuration file.

Regards,
Fred

Jeetu Choudhary

unread,
Mar 30, 2016, 9:15:31 AM3/30/16
to Snowplow
Is it necessary to pass name of the bad stream. And if it is then can you tell me how and what is the use of bad Stream

Jeetu Choudhary

unread,
Mar 30, 2016, 9:29:10 AM3/30/16
to Snowplow
Thanku Fred for help, it is now working
can you tell me what is it about:


18:49:50.461 [scala-stream-collector-akka.actor.default-dispatcher-2] INFO  akka.actor.DeadLetterActorRef - Message [akka.io.Tcp$Bound] from Actor[akka://scala-stream-collector/user/IO-HTTP/listener-0#-1591867560] to Actor[akka://scala-stream-collector/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'

Thanks

Fred Blundun

unread,
Mar 30, 2016, 9:58:12 AM3/30/16
to snowpl...@googlegroups.com
Don't worry about that INFO message - it isn't actually a problem. (More information here.)

You can disable dead letter logging by setting `akka.log-dead-letters = 0` in the config.

When the Scala Stream Collector fails to process an event (for example because the event is too large to fit in a Kinesis record) it sends a bad row JSON containing information about what went wrong to the bad stream (which is another Kinesis stream).

Hope that helps,
Fred

Jeetu Choudhary

unread,
Mar 30, 2016, 10:24:36 AM3/30/16
to Snowplow
Can we pass same stream in both good and bad
and is it necessary to pass bad stream because if i don't then it is showing an error regarding it

Thanks

Fred Blundun

unread,
Mar 30, 2016, 10:37:44 AM3/30/16
to snowpl...@googlegroups.com
You shouldn't configure the good stream and the bad stream to be the same stream, as then any application reading that stream will have to deal with both good enriched events and failure JSONs.

You do have to configure the bad stream. I have created a ticket about possibly changing this here.

Regards,
Fred

Jeetu Choudhary

unread,
Mar 30, 2016, 1:24:46 PM3/30/16
to Snowplow
Okay Fred

Thanks

Jeetu Choudhary

unread,
Mar 31, 2016, 5:20:33 PM3/31/16
to Snowplow
I have created Scala Stream collector but now it is running on my local host, but i want to send data from javascript tracker. How can i do that.

Ihor Tomilenko

unread,
Mar 31, 2016, 5:39:32 PM3/31/16
to Snowplow
Hi Jeetu,

The response to your last question will follow in the other thread you also created. Thank you for that as we don't want to mix up different topics.

Regards,
Ihor
Reply all
Reply to author
Forward
0 new messages