I am translating kafka's streams word count example to scala version.
But compiler complains inferred type arguments do not conform to map function's type parameter bounds, and type mismatch.
inferred type arguments [?1,?0] do not conform to method map's type parameter bounds [KR,VR]
[error] ).map(
[error] ^
type mismatch;
[error] found : org.apache.kafka.streams.kstream.KeyValueMapper[String,String,org.apache.kafka.streams.KeyValue[String,String]]
[error] required: org.apache.kafka.streams.kstream.KeyValueMapper[_ >: String, _ >: String, _ <: org.apache.kafka.streams.KeyValue[_ <: KR, _ <: VR]]
[error] new KeyValueMapper[String, String, KeyValue[String, String]] {
[error] ^
The code snippet looks like
val counts = source.flatMapValues(
new ValueMapper[String, java.lang.Iterable[String]] {
override def apply(value: String): java.lang.Iterable[String] = {
val ary = value.toLowerCase(Locale.getDefault()).split(" ")
Arrays.asList(ary).asInstanceOf[java.lang.Iterable[String]]
}
}
).map(
new KeyValueMapper[String, String, KeyValue[String, String]] {
override def apply(key: String, value: String): KeyValue[String, String] = new KeyValue(value, value)
}
).groupByKey().count("Counts");
How can I fix this compilation error?
Thanks.