Replay on crash - how to avoid duplicates?

251 visualizações
Pular para a primeira mensagem não lida

Vijay Veeraraghavan

não lida,
28 de mai. de 2014, 05:09:5428/05/2014
para java-ch...@googlegroups.com
I am using the chronicle to log the messages I receive from java-servlet/smpp/ftp-poller/email-poller and few more. I write all the requests (without assembling them) to the chronicle. A reader reads the chronicle, process and deposits them into the database. Suddenly the application crashes, say the chronicle-reader. As the messages are persisted, I can replay all of them. But there is a problem, I cannot replay all of them in the chronicle, because, when I do, I get the duplicates (i.e., reader is reprocessing all those it had already processed). So, how do I keep track of how many messages I have processed, so that when I replay, I process only those unread? Do I need to maintain this "index-location?" somewhere outside, so that it is checked by the reader when it recovers from the crash? How you guys have implemented replay? 
Now, say the chronicle-writer has crashed. Do the chronicle start appending the new messages to the end? If so, the chronicle-reader get the last written index on nextIndex() or the first one? Or everytime it starts with a new index? (I need to stimulate this scenario)
thanks

--Vijay Veeraraghavan

Karlis Zigurs

não lida,
28 de mai. de 2014, 05:17:2128/05/2014
para java-ch...@googlegroups.com
Ultimately this depends on where the outcome of the processing the messages is stored - if the results are written to some filesystem or database in some transformed format the processing part should be able to store the "sourceIndexProcessed" or similar field with the results it can look up on recovering from a crash to seek to the correct (n + 1) record to resume processing. Keeping the link between source index and results will also guarantee that your index doesn't get out of sync in case if there is crash or data loss on the consumers side - lost data is dealt simply by returning to the last known index.

K



--
You received this message because you are subscribed to the Google Groups "Java Chronicle" group.
To unsubscribe from this group and stop receiving emails from it, send an email to java-chronicl...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Vijay Veeraraghavan

não lida,
28 de mai. de 2014, 06:33:1428/05/2014
para java-ch...@googlegroups.com
thanks zigurs for your fast reply.
shall the source index processed then can be the actual "index" value (in my case, I will store this value to a file) ? Can you list some more fields that can help me in the recovery? I use VanillaChronicle, and version is 3.0.1

Karlis Zigurs

não lida,
28 de mai. de 2014, 07:20:4428/05/2014
para java-ch...@googlegroups.com
Now, this has little to do with chronicle specifically (just the basics of event sourcing, log replay and snapshotting deterministic state machines) therefore my apologies to other list members, but I'm in a good mood today ;) - so let's look at two possible examples:

- First example: You are writing your raw log to a chronicle (or any other log file) and you have a processor which looks for particular request types and every time it sees one it writes it to a database (in example to a table: RequestType|RequestUser|RequestParameter). In this case if you suffer a crash your processing node has no way to understand where to look in the log for the next entry and you would be forced to compare records in the database against those you generate while processing the log from beginning. Not a good thing to do.

If you could add a column to your table "ChronicleIndexProcessed" (containing the chronicle entry index you processed the record from) so that your table now looks like (RequestType|RequestUser|RequestParameter|ChronicleIndexProcessed - and write the index of chronicle record you are processing every time when you decide to write to the database) when recovering from a crash you could first read the highest processed index from your database ("select max(ChronicleIndexProcessed) from MyTable") and use the returned index to start reading chronicle log from returned index + 1 (or 0 if there are no records in the database). This would guarantee that you would be processing any relevant entries only once (as you would always start reading from the index in the chronicle that is next after the one you know you have seen and processed - any entries you would read twice you already know are of no significance since they generated no records before). Alternative way to think of this would be LastTimestampProcessed, but I'd generally discourage using timestamps for this as they have no guarantees of always being unique and incremental.

- Second example: You are processing a log file (again, let's say records in a chronicle) which contains a addition or subtraction operations of single variable (x=100; x=x+5; x=x+1;x=x-42;x=x+3; ...). In this case you could periodically (every 1000 records, once per day, etc) store your variable x in a file which would contain the current value of x - and name the file snapshotOfX_<last processed chronicle index>.snapshot (or add the chronicle index value as a second line in the file, binary encode it in the file metadata, name the file using the index encoded in hex and appending a checksum byte - the possibilities are endless). Upon restarting you would read the list of your files, look up the file with the highest processed chronicle index, read the value of x stored in the file and continue processing the chronicle log file from index specified in file + 1. This would lead you to the same result as if you had read the file from the beginning.

In both examples above you are creating some kind of snapshot - if in the database then your snapshot is your database and continuously evolving, if in file - number of older and older snapshots, but in both cases you want to associate the chronicle / log file index that lead to the current snapshot with the snapshot directly. In case if you need to perform more advanced / multiple processing steps that can span different indexes in the chronicle it may be best to first think of them as separate processing paths and only when you are comfortable that you can manage their state independently think of merging them in a shared processing stage.

On the last note - if you are only just starting with Chronicle I would avoid using VanillaChronicle outright, but would use IndexedChronicle for prototyping first. The code in current version of VanillaChronicle (of course only in my opinion) is far too dense (and lacking comments) to be tackled outright if you are not fully comfortable with concepts and code paths that make IndexedChronicle tick.

K


Kieron Wilkinson

não lida,
28 de mai. de 2014, 08:47:3328/05/2014
para java-ch...@googlegroups.com
Hi Vijay,

A technique that works really well for us is to have the writer, appending to the chronicle, write out a flag as part of the message to the stream, e.g. "processed".

Our "reader" (that logs to the database) then processes each entry, and when the database insert is successful, we go to the flag's position, write it as true, and then call finish.

When the system restarts, the reader can simply reads up to the next unprocessed entry and starts logging again.

This means we don't have any external tracking of the chronicle progress.

It does mean you have effectively two writers in different threads, but this is okay, as only one is appending, the other is just updating some data in an existing entry. At least, it works well with IndexedChronicle.

Kieron

Vijay Veeraraghavan

não lida,
29 de mai. de 2014, 01:08:1929/05/2014
para java-ch...@googlegroups.com
Zigurs
Thanks for your elaborate and neat explanation. I will try both the cases and see which one would help us best within the time frame I have.
Thanks
--Vijay

Vijay Veeraraghavan

não lida,
29 de mai. de 2014, 03:05:3229/05/2014
para java-ch...@googlegroups.com
Thanks Wilkinson
I will try this too. This approach looks simple, as it needs no external component. 
--Vijay

Vijay Veeraraghavan

não lida,
2 de jun. de 2014, 01:18:2302/06/2014
para java-ch...@googlegroups.com
Wilkinson
In the Excerpt I write the details, with a boolean flag set to false. In the reader I check this flag, if false I process the message then update the flag to true, else skip. With very less documentation available I manage to find this code updates the last boolean field. Is this how it should be updated?
boolean complete = tailer.readBoolean();
if(!complete) {
try {
   tailer.writeBoolean(tailer.position() - 1, true);
} catch (Exception e) {
e.printStackTrace();
} finally {
tailer.finish();
}
}
regards
--V

Kieron Wilkinson

não lida,
2 de jun. de 2014, 03:52:0202/06/2014
para java-ch...@googlegroups.com

Hi Vijay,

I assume that would work, but you probably want to use a Excerpt rather than a ExcerptTailer if you're modifying the stream. Peter can correct me on this, but my understanding is that the trailer has some optimisations for appending, and so it doesn't seem right to use it to modify previous excerpts.

I now remember what I did was a bit more involved because I wanted to have a "consuming" reader that provides these core features:

1) A "next()" method to read the next item. Repeated calls to "next()" will return the same item until it is "consumed". This allows you to read an item, and try to process it, but allow reprocessing of the item to allow for retries.

2) A "consume()" method that marks the last read item as finished with (either successfully processed, or skipped).

3) A "resume()" method, that reads through the Chronicle stream from the start until it reaches the next unprocessed transaction. This is slightly naïve perhaps, but it's simple, and since it only reads a flag at the beginning of each index to see if it has been processed yet, it is super fast (at least for my purposes).

I think this might be generally useful, so I'll see if I can find some time to blog the details, as I've seen various people asking about how to do the same sort of thing. I'll post back with a link, but may take a couple of days.

Kieron

--
You received this message because you are subscribed to a topic in the Google Groups "Java Chronicle" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/java-chronicle/CctzhN6tc9o/unsubscribe.
To unsubscribe from this group and all its topics, send an email to java-chronicl...@googlegroups.com.

Peter Lawrey

não lida,
2 de jun. de 2014, 04:12:5502/06/2014
para java-ch...@googlegroups.com

I suspect what you really want is a CAS operation which has the advantage of being thread safe (although uses an int)

You can do

if(excerpt.compareAndSwap(0L 0, 1)) // did i set from 0 to 1

This has the benefit of exclusively updating the record. To make the field more useful you can add the time. E.g.

static final int SENT_OFFSET = 0;
static final int RECV_OFFSET = 8;
static final int MSG_STARTS = 16;

On the reading side you can do

if (tailer.compareAndSwap(RECV_OFFSET, 0L, System.nanoTime()) {
    // Updated with the processing time

}

Tailer can update a record but only the last record it is reading, not random access. Excerpt can access any record but is more heavy weight to do the move between records. 3-4x the cost.  Given reading the next record only, is a common usecase , the Tailer was added to optimise this usage.

Once moved, they behave the same.

The thread safety work for one machine ie across processes but not replication. The change is only guarenteed to be visible on the machine you performed the change.

(Programmed on my phone so you might have to correct a couple of things ;)

--
You received this message because you are subscribed to the Google Groups "Java Chronicle" group.
To unsubscribe from this group and stop receiving emails from it, send an email to java-chronicl...@googlegroups.com.

Vijay Veeraraghavan

não lida,
3 de jun. de 2014, 03:39:0003/06/2014
para java-ch...@googlegroups.com
Ok, so how about this?
if(unprocessed) {
try {
Excerpt excerpt = tailer.chronicle().createExcerpt();
excerpt.index(tailer.index());
excerpt.writeBoolean(tailer.position() - 1, true);
excerpt.finish();
} catch (Exception e) {
e.printStackTrace();
} finally {
tailer.finish();
}
}
regards

--V 

Vijay Veeraraghavan

não lida,
3 de jun. de 2014, 03:48:5303/06/2014
para java-ch...@googlegroups.com
how does this works?
tailer.zeroOut(start, end);
where the start is the 0 and the end is the end of that tailer?
I restarted the reader from the crash and found every message I read was empty. is it a good way?

regards
--V

Peter Lawrey

não lida,
3 de jun. de 2014, 09:46:5903/06/2014
para java-ch...@googlegroups.com
You could do that, but I would just update the Tailer, and not use an Excerpt unless you actually need random access.


--
You received this message because you are subscribed to the Google Groups "Java Chronicle" group.
To unsubscribe from this group and stop receiving emails from it, send an email to java-chronicl...@googlegroups.com.

Peter Lawrey

não lida,
3 de jun. de 2014, 09:48:3803/06/2014
para java-ch...@googlegroups.com
zeroOut is to blank out a block of memory.  I am not sure it is useful for Chronicle, but you might have found a use for it.

> I restarted the reader from the crash and found every message I read was empty

Are you saying this is what happened after a crash or what happen after you used zeroOut?


--
You received this message because you are subscribed to the Google Groups "Java Chronicle" group.
To unsubscribe from this group and stop receiving emails from it, send an email to java-chronicl...@googlegroups.com.

Vijay Veeraraghavan

não lida,
5 de jun. de 2014, 03:25:5505/06/2014
para java-ch...@googlegroups.com
Lawrey
Sorry for my late reply.
Yes. 
All my reader reader does is, read the message, if there is, and zeroOut if after processing. I programatically terminate the reader on random time. When I restart the reader, as usual, the reader start reading from the first. On every excerpt it reads it is empty, but only till the crash-index location, after which the messages are present, they are consumed and processed.
regards
--V

Peter Lawrey

não lida,
5 de jun. de 2014, 06:25:4705/06/2014
para java-ch...@googlegroups.com

Instead of zeroing out the whole record you can add a byte which determines if a record has been processed. If you use an int, you can do a CAS to allocate the message to one of multiple consumers.

Vijay Veeraraghavan

não lida,
5 de jun. de 2014, 06:35:2305/06/2014
para java-ch...@googlegroups.com
Okay Perter, then I would do this.
//producer
....
appender.writeInt(0); // indicator
appender.finish();
....

//consumer
....
boolean compareAndSwapInt = tailer.compareAndSwapInt(
tailer.position(), 0, 1); // this returned true in the first read and false on consequent reads.
if (compareAndSwapInt) {
  process();
}
....

regards
--V

Peter Lawrey

não lida,
5 de jun. de 2014, 06:39:2705/06/2014
para java-ch...@googlegroups.com

If you use position 0L you can do
if(excerpt.compareAndSwapInt(0L, 0, myid)) {
   processMessage(excerpt);

The cool thing about approach is it works even if you have multiple readers and you can see which reader read the message.

Vijay Veeraraghavan

não lida,
5 de jun. de 2014, 06:54:4105/06/2014
para java-ch...@googlegroups.com
Thanks, Peter, for suggesting this idea. should the writer thread write anything?
I tried this,
my code 
expert.compareAndSwapInt(0L, 0, 1); but returns false?

regards
--V

Peter Lawrey

não lida,
5 de jun. de 2014, 06:56:0605/06/2014
para java-ch...@googlegroups.com

The writer has to write some padding for the reader to over write. The CAS will fail if the value is not 0 already.

Vijay Veeraraghavan

não lida,
5 de jun. de 2014, 07:03:1605/06/2014
para java-ch...@googlegroups.com
But, VanillaChronicle does support padding?

Peter Lawrey

não lida,
5 de jun. de 2014, 07:19:3105/06/2014
para java-ch...@googlegroups.com

It is clearer if you write out your padding  with a write of 0. You could just move the position but this is not as clear unless you have a lot of padding.

Vijay Veeraraghavan

não lida,
5 de jun. de 2014, 07:25:4005/06/2014
para java-ch...@googlegroups.com
I do in writer
appender.writeInt(0); // indicator
....
....
appender.finish();


in reader
while(excerpt.nextIndex()) {
excerpt.readInt(); // without this I get exceptions in reading the subsequent excerpts.
boolean compareAndSwapInt = excerpt.compareAndSwapInt(0L, 0, 1);
if (!compareAndSwapInt) {
continue;
}
process(excerpt);
}
correct me If I am wrong somewhere.
regards
--V

Peter Lawrey

não lida,
5 de jun. de 2014, 07:53:3505/06/2014
para java-ch...@googlegroups.com

Correct though you can put the readInt after the CAS is successful. Or you can move the excerpt.position(4)

Vijay Veeraraghavan

não lida,
5 de jun. de 2014, 10:45:4005/06/2014
para java-ch...@googlegroups.com
pad zeros to the left of the message or right to the message?
Is this your idea, the readers update their specific region in the padding?

regards
--V

Peter Lawrey

não lida,
5 de jun. de 2014, 17:55:5605/06/2014
para java-ch...@googlegroups.com
Before the message is easier.  All the readers would attempt to update this padded area but only one would succeed and this is the one which will process it.
Responder a todos
Responder ao autor
Encaminhar
0 nova mensagem