How does the compaction system deal with duplicate datapoints?
We use daily batch imports of data and we import more than 24 hours to
make sure we don't miss any points. Without compaction this is no
problem as hbase/opentsdb will just overwrite the existing datapoint
with a new version. However i wonder what happens with compacted data
if a new datapoint is added for an already existing, but compacted
row.
Reason i'm asking is because I'm seeing some weird graphs since
enabling compaction and got this error on one of the queries:
net.opentsdb.core.IllegalDataException: Found out of order or
duplicate data: cell=Cell([75, 11], [64, -38, 42, -65]), delta=1200,
prev cell=Cell([75, 11], [64, -38, 38, -64]), last_delta=1200, in
row=[KeyValue(key=[0, 0, 42, 79, -124, -13, -80, 0, 0, 1, 0, 0, -61,
0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4, 0, 18, 15],
family="t", qualifier=[0, 11], value="@@\x00\x00",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier=[0, 11, 18, -53, 37, -117, 56, 75,
75, 11, 93, -53, 112, -117, -125, 75, -106, 11, -88, -53, -69, -117,
-50, 75], value="@@\x00\x00@@\x00\x00@@\x00\x00@@\x00\x00@\xDA&\xC0@P\xAFS@@\x00\x00@@\x00\x00@@\x00\x00@@\x00\x00@@\x00\x00@@\x00\x00\x00",
timestamp=1334287886985), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier=[18, -53], value="@@\x00\x00",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier="%\x8B", value="@@\x00\x00",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier="8K", value="@@\x00\x00",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier="K\x0B", value="@\xDA*\xBF",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier="]\xCB", value="@P\xAF\x86",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier="p\x8B", value="@@\x00\x00",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier="\x83K", value="@@\x00\x00",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier=[-106, 11], value="@@\x00\x00",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier=[-88, -53], value="@@\x00\x00",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier=[-69, -117], value="@@\x00\x00",
timestamp=1334290799915), KeyValue(key=[0, 0, 42, 79, -124, -13, -80,
0, 0, 1, 0, 0, -61, 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4,
0, 18, 15], family="t", qualifier="\xCEK", value="@@\x00\x00",
timestamp=1334290799915)] -- run an fsck.
Cheers,
Hugo
--
Tsune ni ite, kyu ni awasu.
It ignores truly duplicate datapoints (same metric, tags, time, value)
but it complains in cases where the values differ.
> We use daily batch imports of data and we import more than 24 hours to
> make sure we don't miss any points. Without compaction this is no
> problem as hbase/opentsdb will just overwrite the existing datapoint
> with a new version. However i wonder what happens with compacted data
> if a new datapoint is added for an already existing, but compacted row.
As long as you re-import exactly the same data plus some additional
data points, you're fine.
> Reason i'm asking is because I'm seeing some weird graphs since
> enabling compaction and got this error on one of the queries:
>
> net.opentsdb.core.IllegalDataException: Found out of order or
> duplicate data: cell=Cell([75, 11], [64, -38, 42, -65]), delta=1200,
> prev cell=Cell([75, 11], [64, -38, 38, -64]), last_delta=1200, in
> row=[KeyValue(key=[0, 0, 42, 79, -124, -13, -80, 0, 0, 1, 0, 0, -61,
> 0, 0, 2, 0, 19, 2, 0, 0, 3, 0, 11, 119, 0, 0, 4, 0, 18, 15],
See, you have two data points for the same metric, tags, time but
different values.
One has value 1088039615 [64, -38, 42, -65] the other has value
1088038592 [64, -38, 38, -64] but they're both at the same time ([79,
-124, -13, -80] + 1200 = 1334113200 + 1200 = 1334114400 = Wed Apr 11
05:20:00 2012 CET).
So in this case the TSD doesn't know which value is the correct value,
and it complains about the duplicate data point.
I've come to realize that this is annoying in certain use cases, such
as when you import some partially incorrect data and you later
re-import the same time range with slightly different (presumably
corrected) data, without deleting things in the mean time. I'm
thinking of automatically resolving the "conflict" by keeping
whichever value was written last to HBase (based on the timestamp of
the KeyValue).
In the mean time your work around is to always delete the range of
data you're going to overwrite. You can do this using the "scan
--delete" command see http://opentsdb.net/cli.html#scan
Hope this helps.
--
Benoit "tsuna" Sigoure
Software Engineer @ www.StumbleUpon.com
Thanks, this explains a lot.It's a bit troublesome of because of the way we use OpenTSDB. A feature where OpenTSDB solves this silently would be much appreciated.If you can give me some pointers i might be able to get started with implementing it?
Good Information. Thank you.Can you explain the decode methodology for timestamp: ([79,-124, -13, -80] + 1200 = 1334113200 ?
byte[] ts_bytes = new byte[] {79,-124, -13, -80};
int ts = Bytes.getInt(ts_bytes);
System.out.println(ts);
ts_bytes = Bytes.fromInt(ts);