Given a list of row keys, how to read values from bigtable in dataflow pipeline.

1,925 views
Skip to first unread message

chandr...@fuelx.com

unread,
Aug 31, 2018, 5:25:53 PM8/31/18
to Google Cloud Bigtable Discuss
I have a big table T1. I am running an aggregation job where I get data for rowKey1, I need to read the data for rowKey1 from bigtable, merge the 2 values and write back the merged data to BigTable. 

I cannot see a way to achieve this using CloudBigTableIO read operations. 
The rowkeys are generated in a ParDo function which is reading data off multiple files. I can generate a PCollection of rowKeys but not clear on how to read the data from BigTable for each of the rowkeys, and merge. I am able to write back the data using hbase Mutation (Put). The read process is not clear.

Any help is appreciated. 

Currently i am using the singleton approach to create connection to bigtable. So when I run on my local, I am able to see 4 connections being set in the direct runner pipeline and I do a table.get(rowkeyBytes) to get the value. I am not confident this solution can scale. Is there an alternate approach that I can use ?

Solomon Duskis

unread,
Sep 23, 2018, 10:05:58 AM9/23/18
to Google Cloud Bigtable Discuss
First I would suggest using AbstractCloudBigtableTableDoFn instead of managing your own connection since it has settings that work better in Beam like scenarios.

Second, I would suggest aggregating a List of Gets, and using 'get(List<Get>)' / 'put(List<Put>)' for better efficiency.  I don't have an optimal number for you, but ~100 would likely be ok.  You can do this with the use of @ProcessElement / @FinishBundle annotations in your DoFn.  You'll collect the keys in a method with @ProcessElement, and process them either when the list is "full" in the @ProcessElement method, or in the@FinishBundle method.

I hope this helps.

chandr...@fuelx.com

unread,
Oct 31, 2018, 6:57:01 PM10/31/18
to Google Cloud Bigtable Discuss
Thanks Solomon,
Looks like this can help me optimize my pipeline.
I am fairly new to dataflow. Is there an example that collects id's in DoFn that I can look at.

Solomon Duskis

unread,
Oct 31, 2018, 8:33:15 PM10/31/18
to chandr...@fuelx.com, google-cloud-b...@googlegroups.com

Is that the type of thing you're looking for, or is there something else that might help?

Solomon Duskis | Google Cloud clients | sdu...@google.com | 914-462-0531


--
You received this message because you are subscribed to the Google Groups "Google Cloud Bigtable Discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to google-cloud-bigtabl...@googlegroups.com.
To post to this group, send email to google-cloud-b...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/google-cloud-bigtable-discuss/3267d624-05de-45df-b32f-ede6a6a1d62f%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

chandr...@fuelx.com

unread,
Nov 3, 2018, 4:12:35 PM11/3/18
to Google Cloud Bigtable Discuss
I tried this approach of splitting data into chunks. I am not sure if this is correct. When I debug this on local environment I am able to see the data being split into chunks. 
Is this what you were proposing That I split the ID's this way and do a multi-get on bigtable ?

public class SplitKeyList extends DoFn<String, Iterable<String>>{

private static final long serialVersionUID = -2014058754090539296L;

List<String> data = new ArrayList<String>();


@ProcessElement

public void processElement(ProcessContext context) {

String id = context.element();

data.add(id);


if(data.size()==100) {

context.output(data);

data = new ArrayList<>();

}


}


@FinishBundle

public void finishBundle(FinishBundleContext context) {


if(data.size()>0)

context.output(data, Instant.now(), new BoundedWindow() {


@Override

public Instant maxTimestamp() {

return null;

}

});

}

}

Solomon Duskis

unread,
Nov 5, 2018, 3:09:12 PM11/5/18
to chandr...@fuelx.com, Google Cloud Bigtable Discuss
It looks right to me.

--
You received this message because you are subscribed to the Google Groups "Google Cloud Bigtable Discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to google-cloud-bigtabl...@googlegroups.com.
To post to this group, send email to google-cloud-b...@googlegroups.com.

chandr...@fuelx.com

unread,
Nov 12, 2018, 4:28:28 PM11/12/18
to Google Cloud Bigtable Discuss
Hi Solomon,

I tried the multi-get approach with a batch size of 100. The improvement I saw when I run it on a large set of data (~250M records), I didn't see any improvement in the runtime performance when comparing it to get operation per key. The total CPU time taken by the task is still the similar. Do you have any suggestions on this ?

Regards,
Chandu
To unsubscribe from this group and stop receiving emails from it, send an email to google-cloud-bigtable-discuss+unsub...@googlegroups.com.

Solomon Duskis

unread,
Nov 12, 2018, 4:38:42 PM11/12/18
to chandr...@fuelx.com, google-cloud-b...@googlegroups.com
 It's difficult to debug an issue like this without access to private information like your project ID.  Have you created a support ticket?  The support staff can walk you through the monitoring tools, including Key Visualizer, and general monitoring


Solomon Duskis | Google Cloud clients | sdu...@google.com | 914-462-0531

To unsubscribe from this group and stop receiving emails from it, send an email to google-cloud-bigtabl...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Google Cloud Bigtable Discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to google-cloud-bigtabl...@googlegroups.com.

To post to this group, send email to google-cloud-b...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages