Hi,
The below ETL works fine when the list in the below $$posts PState is not subindexed.
StreamTopology postsTopology = topologies.stream("posts");
postsTopology.pstate("$$posts", PState.mapSchema(Long.class, PState.listSchema(Post.class).subindexed()));
postsTopology.source("*postsDepot").out("*post")
.macro(extractJavaFields("*post", "*userId", "*content", "*postedBy"))
.each(Ops.PRINTLN, "Post to be created:", "*post")
.hashPartition("$$posts", "*userId")
.localTransform("$$posts", Path.key("*userId").beforeElem().termVal("*post"))
However, it throws an error when the list is subindexed.
clojure.lang.ExceptionInfo: Serializer not defined for type class rpl.rama.durable.RocksDBWrapper
at rpl.rama.util.nippy_serialization$fn__25646$fn__25647.invoke(nippy_serialization.clj:144) ~[rama-0.11.4.jar:?]
at taoensso.nippy$fn__7950.invokeStatic(nippy.clj:1242) ~[rama-0.11.4.jar:?]
at taoensso.nippy$fn__7950.invoke(nippy.clj:1238) ~[rama-0.11.4.jar:?]
....
....
Here's how the Post class looks like,
public class Post implements RamaSerializable {
public Post(Long userId, String content, Long postedBy) {
this.userId = userId;
this.content = content;
this.postedBy = postedBy;
}
public Long userId;
public String content;
public Long postedBy;
@Override
public String toString() {
StringBuilder sb = new StringBuilder("Post: {");
sb.append(" userId="+userId+"\n");
sb.append(", content="+content+"\n");
sb.append(", postedBy="+postedBy+"\n");
sb.append(" }");
return sb.toString();
}
}
Wondering what's wrong here. Attached full stack trace.