Replication Task for Map

177 views
Skip to first unread message

Kent Hoxsey

unread,
Feb 25, 2015, 4:44:48 PM2/25/15
to java-ch...@googlegroups.com
I am working with Chronicle Map for a monitoring setup, and have a question about good- or best-practice for setting up replication.

My base case uses two processes. The first listens to my event stream and updates the value objects in my map, the second reads the map and updates an operator (GUI) display. So far in my development, I have been doing this on the same machine using a persisted file, and it works swimmingly. I am now ready to split them up so the operator display can run remote from the actual data collection.

The Technical Guide suggests that one may run two maps on the same server, only one of which is replicated:

On a server if you have a number of Java processes and then within each Java process you create an instance of a ChronicleMap which binds to the same underline 'file', they exchange data via shared memory rather than TCP or UDP replication. So if a ChronicleMap which is not performing TCP Replication is updated, this update can be picked up by another ChronicleMap. This other ChronicleMap could be a TCP replicated ChronicleMap. In such an example the TCP replicated ChronicleMap would then push the update to the remote nodes.

I assume the reason to do this is to split out the replication processing (which may be servicing a throttled consumer) from more critical processing operating at full speed, operating it as service to the other map users. Indeed, this is exactly what I would like to do. Which brings me to two questions:

1. Am I reading the Technical Guide correctly, setting up a replication service is reasonable?

2. What is the best thing to do in the main() of my "replication" program? Calling Thread.Sleep() seems not quite right, although maybe I've been ruined by forms apps and message pumps. Looking through the tests, I don't see anything that provides a good model for "service pending map events".

Kent

--- some code, pretty much boilerplate

    public static void main(String[] args) {
        final ChronicleMap<Integer, Double> map;

        String remoteAddress = args[0];

        ChronicleMapBuilder<Integer, Double> builder = ChronicleMapBuilder
                .of(Integer.class, Double.class).entries(100_000)
                .replication(configureReplication(1234, 5678, remoteAddress));

        map = builder.create();

        //start replicating -- *** what to do here? ***
        //Thread.Sleep() doesn't seem like the right thing

        M.close();
    }

Rob Austin

unread,
Feb 25, 2015, 4:57:14 PM2/25/15
to java-ch...@googlegroups.com
Thanks for your question. If it's ok, I'll give you an answer tomorrow. 

Rob

--
You received this message because you are subscribed to the Google Groups "Chronicle" group.
To unsubscribe from this group and stop receiving emails from it, send an email to java-chronicl...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Kent Hoxsey

unread,
Feb 25, 2015, 5:08:38 PM2/25/15
to java-ch...@googlegroups.com
Hi Rob. Yes, of course tomorrow is fine! Thanks for all the fine work!

Rob Austin

unread,
Feb 26, 2015, 4:11:12 AM2/26/15
to java-ch...@googlegroups.com
See my comments below

I am working with Chronicle Map for a monitoring setup, and have a question about good- or best-practice for setting up replication.

My base case uses two processes. The first listens to my event stream and updates the value objects in my map, the second reads the map and updates an operator (GUI) display. So far in my development, I have been doing this on the same machine using a persisted file, and it works swimmingly. I am now ready to split them up so the operator display can run remote from the actual data collection.

The Technical Guide suggests that one may run two maps on the same server, only one of which is replicated:

On a server if you have a number of Java processes and then within each Java process you create an instance of a ChronicleMap which binds to the same underline 'file', they exchange data via shared memory rather than TCP or UDP replication. So if a ChronicleMap which is not performing TCP Replication is updated, this update can be picked up by another ChronicleMap. This other ChronicleMap could be a TCP replicated ChronicleMap. In such an example the TCP replicated ChronicleMap would then push the update to the remote nodes.

I assume the reason to do this is to split out the replication processing (which may be servicing a throttled consumer) from more critical processing operating at full speed, operating it as service to the other map users. Indeed, this is exactly what I would like to do. Which brings me to two questions:

1. Am I reading the Technical Guide correctly, setting up a replication service is reasonable?

Yes,  I think so,  you should use Tcp Replication . Will you split the service so that you have a single service on each machine, if you have ( for example ) one service on one machine and two services on the other I can also show you the config for this, but I assume its just one service on each machine, so see https://github.com/OpenHFT/Chronicle-Map#example--replicating-data-between-process-on-different-servers-via-tcpip


2. What is the best thing to do in the main() of my "replication" program? Calling Thread.Sleep() seems not quite right, although maybe I've been ruined by forms apps and message pumps. Looking through the tests, I don’t see anything that provides a good model for "service pending map events".


I maybe missing something or miss understanding your question - “Chronicle Map takes care of the replication on a background thread. So you don’t have to worry about the “service pending map events” as chronicle takes care of this.”, If you want to know when new events arrive ( in other words to be notified of changes ) you can use the MapEventListener.

Regarding your code below : I’ve put my comments in line


Kent

--- some code, pretty much boilerplate

    public static void main(String[] args) {
        final ChronicleMap<Integer, Double> map;

        String remoteAddress = args[0];

        ChronicleMapBuilder<Integer, Double> builder = ChronicleMapBuilder
                .of(Integer.class, Double.class).entries(100_000)
                .replication(configureReplication(1234, 5678, remoteAddress));

Im not sure what configureReplication() is, but you should do something like this : - see https://github.com/OpenHFT/Chronicle-Map#example--replicating-data-between-process-on-different-servers-via-tcpip

            TcpTransportAndNetworkConfig tcpConfig = TcpTransportAndNetworkConfig
                    .of(8076, new InetSocketAddress("localhost", 8077))
                    .heartBeatInterval(1L, TimeUnit.SECONDS);
            ChronicleMapBuilder<Integer, CharSequence> map1Builder =
                    ChronicleMapBuilder.of(Integer.class, CharSequence.class)
                            .replication((byte) 1, tcpConfig);



        map = builder.create();

        //start replicating -- *** what to do here? ***
you should not have to do anything here chronicle will handle the replication for you 

        //Thread.Sleep() doesn’t seem like the right thing

this is typically where your business logic would go ( maybe in an event loop), and yes before you shut down your application its good practice to call M.close(),

I would suggest looking at some of our test cases like net.openhft.chronicle.map.TCPSocketReplicationTest
 

Rob

Kent Hoxsey

unread,
Feb 26, 2015, 10:58:34 AM2/26/15
to java-ch...@googlegroups.com
Thanks for the detailed workup, Rob. I appreciate all the effort you and your team have put into documentation and test code, I've learned quite a bit about how to use the tools from reading it.

To follow up on your question above, I am trying to run 2 services on one machine, and one service on the other. I would welcome any pointers you had for handling this case.

Rob Austin

unread,
Feb 26, 2015, 11:20:13 AM2/26/15
to java-ch...@googlegroups.com
To follow up on your question above, I am trying to run 2 services on one machine, and one service on the other. I would welcome any pointers you had for handling this case.

sure - I will definitely update our documentation in this area : https://github.com/OpenHFT/Chronicle-Map#multiple-processes-on-the-same-server-with-replication
and give more detail of how to do this, but if you could give me a couple of days as I’m dead in the middle of some development, off the top of my head, I think its something like this :


the tick is to use replication but just specify the identifier like this : .replication(server1Identifier) , see example psudo code below 


TcpTransportAndNetworkConfig tcpConfig1 = 
TcpTransportAndNetworkConfig tcpConfig2 = ...
byte server1Identifier =1;
byte server2Identifier =2


map1Server1 = ChronicleMapBuilder
    .of(Integer.class, CharSequence.class)
    .replication(server1Identifier, tcpConfig1) // used to provide just the servers port
    .createPersistedTo(file); // must be the same file as used by map2Server2



map2Server1 = ChronicleMapBuilder
    .of(Integer.class, CharSequence.class)
    .replication(server1Identifier) // notice NO tcpConfig, so this will get the updates via map1Server1 using the file, the server1Identifier ** must ** be the same as map1Server1
    .createPersistedTo(file); // must be the same file as used by map1Server1


other server : 


map3Server2 = ChronicleMapBuilder
    .of(Integer.class, CharSequence.class)
    .replication(server2Identifier, tcpConfig2) // used to connect to server1
    .createPersistedTo(file); // must be the same file as used by map1Server2

Rob Austin

unread,
Feb 26, 2015, 11:28:29 AM2/26/15
to java-ch...@googlegroups.com
One more thing, if your application is performance critical, then this ( map2Server1 ) is the best process to put your business critical data on, as this process does not do any TCP replication ( having said that ( without thread affinity ) the resources of the machine are likely to be shared across all your processes so it probably won’t make much difference, but just thought that you may be interested.

Rob

On 26 Feb 2015, at 16:20, Rob Austin <rob.a...@higherfrequencytrading.com> wrote:

map2Server1 = ChronicleMapBuilder
    .of(Integer.class, CharSequence.class)
    .replication(server1Identifier) // notice NO tcpConfig, so this will get the updates via map1Server1 using the file, the server1Identifier ** must ** be the same as map1Server1
    .createPersistedTo(file); // must be the same file as used by map1Server1

Kent Hoxsey

unread,
Feb 26, 2015, 12:28:40 PM2/26/15
to java-ch...@googlegroups.com
Ah! That makes perfect sense, both in terms of configuration (no tcpConfig) and intent.

So if I have my business-critical processing happening in map2server1, what is a good pattern for the code to run in map1server1? I envision that its purpose is to facilitate the replication processing, but I am not sure how best to handle this. I am wondering about something like this (and particularly curious about what else might need to happen in the run() method):

public static void main(String[] args) {

    Thread replicator = new Replicator();
    replicator.start();

}

class Replicator extends Thread {
    ChronicleMapBuilder builder = ...
    ChronicleMap map = builder.create();

    public void run() {
       //should there be something else here?
       Thread.yield();

Rob Austin

unread,
Feb 26, 2015, 12:35:52 PM2/26/15
to java-ch...@googlegroups.com
you don’t have to create a Replicator thread, chronicle map will do this for you, in its simplest form, in your main thread all you have to do is to  create the instance of map and then wait/(or block) until you are ready to shut down, then call close.  In the background chronicle will keep all the maps in sync.

Kent Hoxsey

unread,
Feb 26, 2015, 12:37:20 PM2/26/15
to java-ch...@googlegroups.com
Great, so easy. Thanks!

Kent Hoxsey

unread,
Mar 3, 2015, 5:22:12 PM3/3/15
to java-ch...@googlegroups.com
I am having an issue with replication and am unsure what I am doing wrong. I have implemented as directed earlier in this thread, and everything seems to be configured properly. At the moment, I am propagating changes in one direction only, producer to consumer. When I start up the producer I can see that it is updating the map as one would expect. And when I start up the consumer I see the bootstrap occur. But then it seems like the changes stop flowing.

Looking at the packet exchange in Wireshark, I can see the data for the initial bootstrap (my keys are simple CharSequence, so visible in the packets). Once the bootstrap is finished, the packet exchange looks like only the heartbeat. Weirdly, every now and then it will update the data for one key.

I am using Chronicle Map 2.1.1. The data is a value object generated using a static interface (ala the VO interface examples). The issue is occurring between maps running on Win64 machines.

Any pointers for driving the changes would be appreciated.

Rob Austin

unread,
Mar 4, 2015, 6:55:02 AM3/4/15
to java-ch...@googlegroups.com
Thanks for your email, could you check that you have assigned a different/unique identifier for each host.
Could you send us a small junit test that reproduces this problem. Ideally, that uses localhost, but with different ports, to simulate each host.

thanks

Rob

Kent Hoxsey

unread,
Mar 5, 2015, 12:28:08 AM3/5/15
to java-ch...@googlegroups.com
I am not sure I have the chops to create a JUnit test that runs two concurrent maps. But the attached simple 1-pager illustrates the problem. A producer and a consumer, each with their own persisted map file, communicating via TCP.  It is easy to see from the output that the producer notifies the consumer about each new entry in the map, but does not continue to refresh the entries as they change.

Snippet of the two outputs (can see here the Consumer started one iteration later):

Producer:
 a:1 
 b:1  a:2 
 b:2  c:1  a:3 
 d:1  b:3  c:2  a:4 
 d:2  b:4  c:3  e:1  a:5 
 d:3  b:5  c:4  f:1  e:2  a:6 
 d:4  b:6  g:1  c:5  f:2  e:3  a:7 
 d:5  b:7  g:2  c:6  f:3  h:1  e:4  a:8 
 d:6  b:8  g:3  c:7  i:1  f:4  h:2  e:5  a:9 
 j:1  d:7  b:9  g:4  c:8  i:2  f:5  h:3  e:6  a:10 

Consumer:
 b:1  a:1 
 b:1  c:1  a:1 
 d:1  b:1  c:1  a:1 
 d:1  b:1  c:1  e:1  a:1 
 d:1  b:1  c:1  f:1  e:1  a:1 
 d:1  b:1  g:1  c:1  f:1  e:1  a:1 
 d:1  b:1  g:1  c:1  f:1  h:1  e:1  a:1 
 d:1  b:1  g:1  c:1  i:1  f:1  h:1  e:1  a:1 
 j:1  d:1  b:1  g:1  c:1  i:1  f:1  h:1  e:1  a:1 
Main.java

Rob Austin

unread,
Mar 5, 2015, 11:17:30 AM3/5/15
to java-ch...@googlegroups.com
try connecting them like this

TcpTransportAndNetworkConfig tcpConfig = !isProducer
? TcpTransportAndNetworkConfig.of(8111, new InetSocketAddress("localhost", 8222))
: TcpTransportAndNetworkConfig.of(8222);


<Main.java>

Kent Hoxsey

unread,
Mar 5, 2015, 12:12:50 PM3/5/15
to java-ch...@googlegroups.com
That does not change the behavior.

Which makes sense to me. It is clear that new entries in the map are getting replicated, so it is not the nature of the TCP connection that is the problem. It looks as though the changes to the value object are not setting a "dirty" flag properly, or something.

Rob Austin

unread,
Mar 5, 2015, 12:55:28 PM3/5/15
to java-ch...@googlegroups.com
Kent 

Thanks for your email, could you change your code to use :

map.acquireUsingLocked

see docs at https://github.com/OpenHFT/Chronicle-Map/tree/2.1#acquireusinglocked

see example below and other example test cases in net.openhft.chronicle.map.CHMUseCasesTest

getUsing() should really only ( in most cases ) be used to read a value ( as it only holds a read lock ), not to change it.

If you intend to change the value that you have read you can use ‘acquireUsingLocked’ ( the locks mean no other process/(or thread ) can change it on  the same machine at the same time.

the dirty flag is triggered on the exit of this scope and the lock is also release at the same time
  try (WriteContext wc = map.acquireUsingLocked("1", using)) {
assertTrue(using instanceof StringBuilder);
((StringBuilder) using).append("Hello World");
} // dirty flag triggered here, lock also released


Rob

see example :

@Test
public void testAcquireUsingWithCharSequence() throws IOException {

if (typeOfMap == TypeOfMap.STATELESS)
return; // getUsingLocked supported by the STATELESS client


ChronicleMapBuilder<CharSequence, CharSequence> builder = ChronicleMapBuilder
.of(CharSequence.class, CharSequence.class);

try (ChronicleMap<CharSequence, CharSequence> map = newInstance(builder)) {

CharSequence using = map.newValueInstance();

try (WriteContext wc = map.acquireUsingLocked("1", using)) {
assertTrue(using instanceof StringBuilder);
((StringBuilder) using).append("Hello World");
}

assertEquals("Hello World", map.get("1"));
mapChecks();
}
}






<Main.java>

Rob Austin

unread,
Mar 5, 2015, 12:58:56 PM3/5/15
to java-ch...@googlegroups.com
correction  

( the locks mean no other process/(or thread ) can’t  change it on  the same machine at the same time

Kent Hoxsey

unread,
Mar 5, 2015, 1:08:36 PM3/5/15
to java-ch...@googlegroups.com
That solves it. Thanks so much for the help.
Reply all
Reply to author
Forward
0 new messages