Compaction and duplicate data

1,090 views
Skip to first unread message

Hugo

unread,
Apr 13, 2012, 3:19:02 AM4/13/12
to OpenTSDB
Heya,

How does the compaction system deal with duplicate datapoints?

We use daily batch imports of data and we import more than 24 hours to
make sure we don't miss any points. Without compaction this is no
problem as hbase/opentsdb will just overwrite the existing datapoint
with a new version. However i wonder what happens with compacted data
if a new datapoint is added for an already existing, but compacted
row.

Reason i'm asking is because I'm seeing some weird graphs since
enabling compaction and got this error on one of the queries:

net.opentsdb.core.IllegalDataException: Found out of order or
duplicate data: cell=Cell([75, 11], [64, -38, 42, -65]), delta=1200,
prev cell=Cell([75, 11], [64, -38, 38, -64]), last_delta=1200, in
row=[KeyValue(key=[0, 0, 42, 79, -124, -13, -80, 0, 0, 1, 0, 0, -61,
0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4, 0, 18, 15],
family="t", qualifier=[0, 11], value="@@\x00\x00",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier=[0, 11, 18, -53, 37, -117, 56, 75,
75, 11, 93, -53, 112, -117, -125, 75, -106, 11, -88, -53, -69, -117,
-50, 75], value="@@\x00\x00@@\x00\x00@@\x00\x00@@\x00\x00@\xDA&\xC0@P\xAFS@@\x00\x00@@\x00\x00@@\x00\x00@@\x00\x00@@\x00\x00@@\x00\x00\x00",
timestamp=1334287886985), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier=[18, -53], value="@@\x00\x00",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier="%\x8B", value="@@\x00\x00",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier="8K", value="@@\x00\x00",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier="K\x0B", value="@\xDA*\xBF",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier="]\xCB", value="@P\xAF\x86",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier="p\x8B", value="@@\x00\x00",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier="\x83K", value="@@\x00\x00",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier=[-106, 11], value="@@\x00\x00",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier=[-88, -53], value="@@\x00\x00",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier=[-69, -117], value="@@\x00\x00",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier="\xCEK", value="@@\x00\x00",
timestamp=1334290799915)] -- run an fsck.

Cheers,

Hugo

--
Tsune ni ite, kyu ni awasu.

Hugo
hu...@strocamp.net

tsuna

unread,
Apr 13, 2012, 4:41:29 AM4/13/12
to Hugo, OpenTSDB
On Fri, Apr 13, 2012 at 12:19 AM, Hugo <hu...@trippaers.nl> wrote:
> How does the compaction system deal with duplicate datapoints?

It ignores truly duplicate datapoints (same metric, tags, time, value)
but it complains in cases where the values differ.

> We use daily batch imports of data and we import more than 24 hours to
> make sure we don't miss any points. Without compaction this is no
> problem as hbase/opentsdb will just overwrite the existing datapoint
> with a new version. However i wonder what happens with compacted data
> if a new datapoint is added for an already existing, but compacted row.

As long as you re-import exactly the same data plus some additional
data points, you're fine.

> Reason i'm asking is because I'm seeing some weird graphs since
> enabling compaction and got this error on one of the queries:
>
>        net.opentsdb.core.IllegalDataException: Found out of order or
> duplicate data: cell=Cell([75, 11], [64, -38, 42, -65]), delta=1200,
> prev cell=Cell([75, 11], [64, -38, 38, -64]), last_delta=1200, in
> row=[KeyValue(key=[0, 0, 42, 79, -124, -13, -80, 0, 0, 1, 0, 0, -61,
> 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4, 0, 18, 15],

See, you have two data points for the same metric, tags, time but
different values.

One has value 1088039615 [64, -38, 42, -65] the other has value
1088038592 [64, -38, 38, -64] but they're both at the same time ([79,
-124, -13, -80] + 1200 = 1334113200 + 1200 = 1334114400 = Wed Apr 11
05:20:00 2012 CET).

So in this case the TSD doesn't know which value is the correct value,
and it complains about the duplicate data point.

I've come to realize that this is annoying in certain use cases, such
as when you import some partially incorrect data and you later
re-import the same time range with slightly different (presumably
corrected) data, without deleting things in the mean time. I'm
thinking of automatically resolving the "conflict" by keeping
whichever value was written last to HBase (based on the timestamp of
the KeyValue).

In the mean time your work around is to always delete the range of
data you're going to overwrite. You can do this using the "scan
--delete" command see http://opentsdb.net/cli.html#scan

Hope this helps.

--
Benoit "tsuna" Sigoure
Software Engineer @ www.StumbleUpon.com

Hugo

unread,
Apr 13, 2012, 5:35:07 AM4/13/12
to tsuna, OpenTSDB
Thanks,  this explains a lot. 

It's a bit troublesome of because of the way we use OpenTSDB. A feature where OpenTSDB solves this silently would be much appreciated.

If you can give me some pointers i might be able to get started with implementing it?

Cheers,

Hugo

Mikael Byström

unread,
Jan 2, 2014, 8:25:03 AM1/2/14
to open...@googlegroups.com, tsuna, hu...@trippaers.nl
Try using:

./tsdb fsck 2013/01/01-00:00 sum os.cpu

./tsdb fsck 2012/01/01-00:00 sum os.cpu --fix

Or any other metric you would like to fix.

Regards

ManOLamancha

unread,
Jan 2, 2014, 2:47:04 PM1/2/14
to open...@googlegroups.com, tsuna, hu...@trippaers.nl
On Friday, April 13, 2012 2:35:07 AM UTC-7, Hugo wrote:
Thanks,  this explains a lot. 

It's a bit troublesome of because of the way we use OpenTSDB. A feature where OpenTSDB solves this silently would be much appreciated.

If you can give me some pointers i might be able to get started with implementing it?

Sure, you'd want to add an option to enable auto-repairs in utils/Config.java so as to maintain backwards compatibility. Then look at core/Internal.java and core/CompactionQueue.java and core/tsddbQuery.java all in the next branch.

Sivakumar S

unread,
Sep 2, 2016, 6:32:43 AM9/2/16
to OpenTSDB, tsun...@gmail.com, hu...@trippaers.nl
Good Information. Thank you.
Can you explain the decode methodology for timestamp: ([79,-124, -13, -80] + 1200 = 1334113200  ?

--
Siva

ManOLamancha

unread,
Dec 19, 2016, 8:47:51 PM12/19/16
to OpenTSDB, tsun...@gmail.com, hu...@trippaers.nl
On Friday, September 2, 2016 at 3:32:43 AM UTC-7, Sivakumar S wrote:
Good Information. Thank you.
Can you explain the decode methodology for timestamp: ([79,-124, -13, -80] + 1200 = 1334113200  ?

Sure, so the byte array [79, -124, -13, -80] are signed integer values (or Java "bytes") representing a 4 byte array in Java. We take a Unix Epoch timestamp aligned to an hour as a 32-bit integer, which gives us that 4 byte array. By shifting the bits out into the array and from the array into an int we get the integer value. The TSDB Java code (using AsyncHBase) does this:








byte[] ts_bytes = new byte[] {79,-124, -13, -80};


    int ts = Bytes.getInt(ts_bytes);


   


    System.out.println(ts);


   


    ts_bytes = Bytes.fromInt(ts);



Reply all
Reply to author
Forward
0 new messages