Subindexed list throws an error - Serializer not defined for type class rpl.rama.durable.RocksDBWrapper

60 views
Skip to first unread message

Sashidhar T

unread,
Jan 8, 2025, 10:05:29 AMJan 8
to rama-user
Hi,

The below ETL works fine when the list in the below $$posts PState is not subindexed. 

StreamTopology postsTopology = topologies.stream("posts");
postsTopology.pstate("$$posts", PState.mapSchema(Long.class, PState.listSchema(Post.class).subindexed()));

postsTopology.source("*postsDepot").out("*post")
.macro(extractJavaFields("*post", "*userId", "*content", "*postedBy"))
.each(Ops.PRINTLN, "Post to be created:", "*post")
.hashPartition("$$posts", "*userId")
.localTransform("$$posts", Path.key("*userId").beforeElem().termVal("*post"))


However, it throws an error when the list is subindexed.

clojure.lang.ExceptionInfo: Serializer not defined for type class rpl.rama.durable.RocksDBWrapper
at rpl.rama.util.nippy_serialization$fn__25646$fn__25647.invoke(nippy_serialization.clj:144) ~[rama-0.11.4.jar:?]
at taoensso.nippy$fn__7950.invokeStatic(nippy.clj:1242) ~[rama-0.11.4.jar:?]
at taoensso.nippy$fn__7950.invoke(nippy.clj:1238) ~[rama-0.11.4.jar:?]
.... 
.... 


Here's how the Post class looks like,

public class Post implements RamaSerializable {
public Post(Long userId, String content, Long postedBy) {
this.userId = userId;
this.content = content;
this.postedBy = postedBy;
}
public Long userId;
public String content;
public Long postedBy;

@Override
public String toString() {
StringBuilder sb = new StringBuilder("Post: {");
sb.append(" userId="+userId+"\n");
sb.append(", content="+content+"\n");
sb.append(", postedBy="+postedBy+"\n");
sb.append(" }");
return sb.toString();
}
}

Wondering what's wrong here. Attached full stack trace.

Thanks,
Sashi
subindexed_list_error_trace

Nathan Marz

unread,
Jan 8, 2025, 4:42:54 PMJan 8
to rama...@googlegroups.com
Is that error happening on foreign query? That error happens when you try to transfer a RocksDB-backed data structure over the network (such as a subindexed list). 

Your topology code looks like it should work fine, though keep in mind only .afterElem() is efficient on subindexed lists. .beforeElem() requires the whole thing to be rewritten, which is probably undesired, because the subindexed list keeps a map under the hood of index -> value.

--
You received this message because you are subscribed to the Google Groups "rama-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rama-user+...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/rama-user/e1a9c7e5-b3f8-4e00-9858-ec1de4973e8dn%40googlegroups.com.

Sashidhar T

unread,
Jan 8, 2025, 9:31:42 PMJan 8
to rama...@googlegroups.com
Hi Nathan,

Yes the error is happening when doing a PState query from unit test. Let me know how I can fix this.

PState postsPState = ipc.clusterPState(moduleName, "$$posts");
List<Post> createdPosts = postsPState.selectOne(Path.key(userId));


You received this message because you are subscribed to a topic in the Google Groups "rama-user" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/rama-user/ns0M14MSDr0/unsubscribe.
To unsubscribe from this group and all its topics, send an email to rama-user+...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/rama-user/CANRumhKiUx%3DVrLXBkLf7cmyMmOs01US_Qu8QMTYG6jDqKcWWJA%40mail.gmail.com.


--
Regards,
Sashidhar Thallam.

Nathan Marz

unread,
Jan 8, 2025, 10:15:13 PMJan 8
to rama...@googlegroups.com
You can just do postsPState.select(Path.key(userId).all()) to convert the subindexed set to an in-memory list.

Reply all
Reply to author
Forward
0 new messages