Configuring Transaction Log feature

368 views
Skip to first unread message

Sandeep Mishra

unread,
Feb 7, 2018, 5:55:58 AM2/7/18
to JanusGraph users
Hi Guys,

We are trying to used transaction log feature of Janusgraph, which is not working as expected.No callback is received at
public void process(JanusGraphTransaction janusGraphTransaction, TransactionId transactionId, ChangeState changeState) {

Janusgraph documentation says value for log.[X].backend is 'default'.
Not sure what exactly it means. does it mean HBase which is being used as backend for data.

Please let  me know, if anyone has configured it.

Thanks and Regards,
Sandeep Mishra

Jason Plurad

unread,
Feb 7, 2018, 9:58:41 AM2/7/18
to JanusGraph users
It means that it will use the 'storage.backend' value as the storage. See the code in GraphDatabaseConfiguration.java. It looks like your only choice is 'default', and it seems like the option is there for the future possibility to use a different backend.

The code in the docs seemed to work ok, other than a minor change in the setStartTime() parameters. You can cut and paste this code into the Gremlin Console to use with the prepackaged distribution.

import java.util.concurrent.atomic.*;
import org.janusgraph.core.log.*;
import java.util.concurrent.*;

graph
= JanusGraphFactory.open('conf/janusgraph-cassandra-es.properties');

totalHumansAdded
= new AtomicInteger(0);
totalGodsAdded
= new AtomicInteger(0);
logProcessor
= JanusGraphFactory.openTransactionLog(graph);
logProcessor
.addLogProcessor("addedPerson").
        setProcessorIdentifier
("addedPersonCounter").
        setStartTime
(Instant.now()).
        addProcessor
(new ChangeProcessor() {
           
public void process(JanusGraphTransaction tx, TransactionId txId, ChangeState changeState) {
               
for (v in changeState.getVertices(Change.ADDED)) {
                   
if (v.label().equals("human")) totalHumansAdded.incrementAndGet();
                   
System.out.println("total humans = " + totalHumansAdded);
               
}
           
}
       
}).
        addProcessor
(new ChangeProcessor() {
           
public void process(JanusGraphTransaction tx, TransactionId txId, ChangeState changeState) {
               
for (v in changeState.getVertices(Change.ADDED)) {
                   
if (v.label().equals("god")) totalGodsAdded.incrementAndGet();
                   
System.out.println("total gods = " + totalGodsAdded);
               
}
           
}
       
}).
        build
()

tx
= graph.buildTransaction().logIdentifier("addedPerson").start();
u
= tx.addVertex(T.label, "human");
u
.property("name", "proteros");
u
.property("age", 36);
tx
.commit();

If you inspect the keyspace in Cassandra afterwards, you'll see that a separate table is created for "ulog_addedPerson".

Did you have some example code of what you are attempting?

Sandeep Mishra

unread,
Feb 18, 2018, 1:00:08 AM2/18/18
to JanusGraph users
Hi Jason,

Thanks for a prompt reply.
Sample code attached below works well when executed from Gremlin console.
However, execution of Java version doesn't trigger callback. Probably something wrong with my code.
Unfortunately I can't copy code from my office machine.
I will check it again and keep you posted.

Regards,
Sandeep 

Jason Plurad

unread,
Feb 18, 2018, 10:14:48 AM2/18/18
to JanusGraph users
You can use the same exact code in a simple Java program and prove that it works.
I'd think the main thing to watch out for is that your mutations are on a transaction that have the log identifier on it.
Is the Gremlin Server involved in your scenario?

tx = graph.buildTransaction().logIdentifier("addedPerson").start();


Sandeep Mishra

unread,
Feb 18, 2018, 10:56:49 AM2/18/18
to JanusGraph users
Both groovy and java code works with backend as berkeleyje. Tomorrow in office i will try with Hbase as backend. 
Noted on your point.

Thanks and Regards,
Sandeep

Sandeep Mishra

unread,
Feb 20, 2018, 9:46:26 AM2/20/18
to JanusGraph users
Hi Jason,

I tried it with Hbase backend, and I am getting control passed to change processor. 
Appreciate your help, on careful checking I notice that mutation was happening under default transaction initiated by Janusgraph, hence the issue.

However, the problem right now, I am unable to locate a table for data.
I have taken a snapshot of table in hbase using HBase shell before and after processing, but there is no new table created. 
Any idea what could be wrong? Is there as possibility that, its saving log data in janusgraph table meant for saving actual data?

Thanks and Regards,
Sandeep

Jason Plurad

unread,
Feb 20, 2018, 11:05:10 AM2/20/18
to JanusGraph users
Not sure what else to tell you. I just tried the same script from before against HBase 1.3.1, and it created the column family 'ulog_addedPerson' right after the logProcessor.addLogProcessor("addedPerson")...build()command was issued.

hbase(main):001:0> describe 'janusgraph'
Table janusgraph is ENABLED                                                                                                                          
janusgraph                                                                                                                                            
COLUMN FAMILIES DESCRIPTION                                                                                                                          
{NAME => 'e', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREV
ER'
, COMPRESSION => 'GZ', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                  
{NAME => 'f', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREV
ER'
, COMPRESSION => 'GZ', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                  
{NAME => 'g', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREV
ER'
, COMPRESSION => 'GZ', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                  
{NAME => 'h', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREV
ER'
, COMPRESSION => 'GZ', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                  
{NAME => 'i', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREV
ER'
, COMPRESSION => 'GZ', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                  
{NAME => 'l', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => '60480
0 SECONDS (7 DAYS)'
, COMPRESSION => 'GZ', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                  
{NAME => 'm', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREV
ER'
, COMPRESSION => 'GZ', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                  
{NAME => 's', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREV
ER'
, COMPRESSION => 'GZ', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                  
{NAME => 't', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREV
ER'
, COMPRESSION => 'GZ', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                  
{NAME => 'ulog_addedPerson', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE'
, TTL => 'FOREVER', COMPRESSION => 'GZ', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                  
10 row(s) in 0.0230 seconds

Jason Plurad

unread,
Feb 20, 2018, 11:09:14 AM2/20/18
to JanusGraph users
I suppose it could be just confusion on the terminology:

Cassandra -> Keyspace -> Table
HBase -> Table -> Column Family

Sandeep Mishra

unread,
Feb 25, 2018, 5:31:06 AM2/25/18
to JanusGraph users
Yeah Jason. I never bothered to look in Janusgraph table, expecting a new table to be created.
I can find a new column family in my setup too.

Thanks and Regards,
Sandeep

anjani...@gmail.com

unread,
Jul 9, 2020, 9:25:17 AM7/9/20
to JanusGraph users
Hi All,

We are using Janus graph with Cassandra. I am able to capture event using logProcessor and can see table created in Cassandra.

Was trying to figure out, if for some reason logProcessor stops then how to get changes which was done after logProcessor was stopped? 
I tried to start logProcessor by passing previous time thinking it will give all events which were done after that but it does not gave previous changes.


Thanks,
Anjani

Sandeep Mishra

unread,
Jul 11, 2020, 4:19:02 AM7/11/20
to JanusGraph users
Hi,

If you are using same identifier to start the logProcessor, there is no need to explicitly set previous time.
logProcessor keeps a marker of last record read. It should be able to recover from that point.

Do check again.

Regards,
Sandeep

anjani...@gmail.com

unread,
Jul 28, 2020, 12:25:35 PM7/28/20
to JanusGraph users
Thanks Sandeep, yes it works as you mentioned. 
We are using Cassandra as back-end and log tabled are created in it. Data in it are stored as blob type.

I was trying to read blob data type from Cassandra but getting below error 
"InvalidRequest: Error from server: code=2200 [Invalid query] message="In call to function system.blobastext, value 0xc00000000000000000f38503 is not a valid binary representation for type text"

My query : select blobastext(column1) from ulog_test;

How can we read data stored in ulog tables in Cassandra?

Thanks,
Anjani

Pawan Shriwas

unread,
Nov 28, 2020, 12:48:18 AM11/28/20
to JanusGraph users
Hey Jason,

Same thing happen with my as well where above script work well in gremlin console  but when we use it in java. we are not getting anything in process() section as callback. Could you help for the same.  

Sandeep Mishra

unread,
Nov 28, 2020, 5:46:55 AM11/28/20
to janusgra...@googlegroups.com
Never worked with cassandra. Perhaps a base64 encoding is used for storing. 

--
You received this message because you are subscribed to a topic in the Google Groups "JanusGraph users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/janusgraph-users/JN4ZsB9_DMM/unsubscribe.
To unsubscribe from this group and all its topics, send an email to janusgraph-use...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/janusgraph-users/74e66713-d07c-4cca-a7f8-7faf225fc3c9n%40googlegroups.com.

Sandeep Mishra

unread,
Nov 28, 2020, 5:49:20 AM11/28/20
to janusgra...@googlegroups.com
Pawan,
Can you elaborate more on the program where your are trying to embed the script in?
Regards,
Sandeep

--
You received this message because you are subscribed to a topic in the Google Groups "JanusGraph users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/janusgraph-users/JN4ZsB9_DMM/unsubscribe.
To unsubscribe from this group and all its topics, send an email to janusgraph-use...@googlegroups.com.

Pawan Shriwas

unread,
Nov 28, 2020, 7:46:09 AM11/28/20
to JanusGraph users
Hi Sandeep,

Please see below java code and properties information which I am trying in local with Cassandra cql as backend.  This code is not giving me the change log as event which I can get via gremlin console with same script and properties. Please let me know if anything needs to be modify here with code or properties.

<!-- Java Code -->
package com.example.graph;

import org.janusgraph.core.JanusGraph;
import org.janusgraph.core.JanusGraphFactory;
import org.janusgraph.core.JanusGraphTransaction;
import org.janusgraph.core.JanusGraphVertex;
import org.janusgraph.core.log.ChangeProcessor;
import org.janusgraph.core.log.ChangeState;
import org.janusgraph.core.log.LogProcessorFramework;
import org.janusgraph.core.log.TransactionId;

public class TestLog {
public static void listenLogsEvent(){
JanusGraph graph = JanusGraphFactory.open("/home/ist/Downloads/IM/jgraphdb_local.properties");
LogProcessorFramework logProcessor = JanusGraphFactory.openTransactionLog(graph);

logProcessor.addLogProcessor("TestLog").
    setProcessorIdentifier("TestLogCounter").
    setStartTimeNow().
    addProcessor(new ChangeProcessor(){
        @Override
        public void process(JanusGraphTransaction tx, TransactionId txId, ChangeState changeState) {
        System.out.println("tx--"+tx.toString());
        System.out.println("txId--"+txId.toString());
        System.out.println("changeState--"+changeState.toString());
       }
    }).
    build();
for(int i=0;i<=10;i++) {
        System.out.println("going to add ="+i);
    JanusGraphTransaction tx = graph.buildTransaction().logIdentifier("PawanTestLog").start();
    JanusGraphVertex a = tx.addVertex("TimeL");
    a.property("type", "HOLD");
    a.property("serialNo", "XS31B4");
    tx.commit();
        System.out.println("Vertex committed ="+a.toString());
}
}
public static void main(String[] args) {
System.out.println("starting main");
listenLogsEvent();
}
}

<!----- graph properties------->
gremlin.graph=org.janusgraph.core.JanusGraphFactory
graph.name=TestGraph
storage.backend = cql
storage.hostname = localhost
storage.cql.keyspace=janusgraphcql
query.fast-property = true
storage.lock.wait-time=10000
storage.batch-loading=true

Thanks in advance.

Thanks,
Pawan

Pawan Shriwas

unread,
Nov 28, 2020, 7:48:18 AM11/28/20
to JanusGraph users
one correction to last post in below line.

    JanusGraphTransaction tx = graph.buildTransaction().logIdentifier("TestLog").start();

Sandeep Mishra

unread,
Dec 4, 2020, 5:52:51 AM12/4/20
to JanusGraph users
pawan,
can you check for following in your logs Loaded unidentified ReadMarker start time...
seems your readmarker is starting from 1970. so it tries to read changes since then

Regards,
Sandeep

Pawan Shriwas

unread,
Dec 9, 2020, 7:54:17 AM12/9/20
to JanusGraph users
Hi Sandeep,

I think I have already added below line to indicate that it should pull the detail from now onwords in processor. Is it not working?

 "setStartTimeNow()"

Is anyone other face the same thing in their java code? 

Thanks,
Pawan

Sandeep Mishra

unread,
Dec 12, 2020, 9:26:59 AM12/12/20
to JanusGraph users
Pawan, 
I was able to make your code work. the problem is "setStartTimeNow()" 
Instead use  setStartTime(Instant.now()) and test. It works. I am yet to explore difference between two api.
make sure to use a new logidentifier to test.

Regards,
Sandeep  

Sandeep Mishra

unread,
Dec 15, 2020, 5:03:36 AM12/15/20
to JanusGraph users
The code explains behavior. the api sets start time to null instead of Instant.now() hence different behaviour.

public LogProcessorBuilder setStartTimeNow() {
this.startTime = null;
return this;
Reply all
Reply to author
Forward
0 new messages