Are bigtable dataflow mutations atomic at row level

245 views
Skip to first unread message

Vinay Chitlangia

unread,
Feb 15, 2018, 11:05:54 PM2/15/18
to Google Cloud Bigtable Discuss
I have a dataflow job that moves values from column to another, in the same row. The relevant code snippet is:

context.output(new Delete(context.element().getRow())
.addColumn(
CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), cell.getTimestamp()));
context.output(new Put(context.element().getRow())
.addColumn(CellUtil.cloneFamily(cell), newColumn.getBytes(),
cell.getTimestamp(), newValue.build().toByteArray()));

Is this guaranteed to be row atomic....is it possible that the delete happens but the put does not?

If the same were to happen across column families, will it still remain row atomic?

I am using beam 2.1.0
and bigtable-hbase-beam 1.0.0

Solomon Duskis

unread,
Feb 20, 2018, 10:59:20 PM2/20/18
to Google Cloud Bigtable Discuss
This is not guaranteed to be row atomic.  You want a RowMutations for atomic Put+Delete combinations.  Unfortunately, We don't support RowMutations in CloudBigtableIO yet.

As of now, there are a couple of things you can do.  

1) extends AbstractCloudBigtableTableDoFn and do something like this:


    @StartBundle

    public synchronized void getBufferedMutator(StartBundleContext context)

        throws IOException {

      this.mutator = getConnection().getBufferedMutator([Your table]);

    }


    /**

     * Performs an asynchronous mutation via {@link BufferedMutator#mutate(Mutation)}.

     */

    @ProcessElement

    public void processElement(ProcessContext context) throws Exception {

      Mutation mutation = new RowMutation(...)

      mutator.mutate(mutation);

    }


    /**

     * Closes the {@link BufferedMutator} and {@link Connection}.

     */

    @FinishBundle

    public synchronized void finishBundle(FinishBundleContext context) throws Exception {

      try {

        if (mutator != null) {

          mutator.close();

          mutator = null;

        }

      } catch (RetriesExhaustedWithDetailsException exception) {

        rethrowException(exception);

      }

    }

  }



2) Use BigtableIO instead, which should allow both delete and SetCell mutations.

Vinay Chitlangia

unread,
Feb 23, 2018, 11:51:15 PM2/23/18
to Google Cloud Bigtable Discuss
Thanks Solomon.

Some questions inline


On Wednesday, February 21, 2018 at 9:29:20 AM UTC+5:30, Solomon Duskis wrote:
This is not guaranteed to be row atomic.  You want a RowMutations for atomic Put+Delete combinations.  Unfortunately, We don't support RowMutations in CloudBigtableIO yet.

As of now, there are a couple of things you can do.  

1) extends AbstractCloudBigtableTableDoFn and do something like this:


    @StartBundle

    public synchronized void getBufferedMutator(StartBundleContext context)

        throws IOException {

      this.mutator = getConnection().getBufferedMutator([Your table]);

    }


    /**

     * Performs an asynchronous mutation via {@link BufferedMutator#mutate(Mutation)}.

     */

    @ProcessElement

    public void processElement(ProcessContext context) throws Exception {

      Mutation mutation = new RowMutation(...)

I am assuming you mean RowMutations? If so it does not derive from Mutation, and I couldnt find any function
in BufferedMutator which could take RowMutation. BufferedMutator does take a List<Mutation> but the hbase
documentation suggests that it need not be atomic. 

      mutator.mutate(mutation);

    }


    /**

     * Closes the {@link BufferedMutator} and {@link Connection}.

     */

    @FinishBundle

    public synchronized void finishBundle(FinishBundleContext context) throws Exception {

      try {

        if (mutator != null) {

          mutator.close();

          mutator = null;

        }

      } catch (RetriesExhaustedWithDetailsException exception) {

        rethrowException(exception);

      }

    }

  }



2) Use BigtableIO instead, which should allow both delete and SetCell mutations.
Is there an example? 

Solomon Duskis

unread,
Feb 26, 2018, 10:32:43 AM2/26/18
to chitl...@gmail.com, google-cloud-b...@googlegroups.com
You're right.  It's RowMutations, which is a Row and not a Mutation in HBase 1.x.  I guess you'll have to collect the RowMutations in a List, and then call table.batch(list) in @FinishBundle.

Here's BigtableIO's example of making the equivalent of a "Put".  Here's our conversion between Deletes and Mutation.


Solomon Duskis | Google Cloud Bigtable Tech Lead | sdu...@google.com | 914-462-0531


--
You received this message because you are subscribed to the Google Groups "Google Cloud Bigtable Discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to google-cloud-bigtabl...@googlegroups.com.
To post to this group, send email to google-cloud-b...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/google-cloud-bigtable-discuss/4ad96d3d-ac8b-48e3-8f69-bc4de561c380%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages