table-lookup directive in data fusion wrangler

221 views
Skip to first unread message

Mohamed Ali Bardi

unread,
Feb 18, 2022, 2:01:29 PM2/18/22
to CDAP User
Hello,

I'm trying to use the table-lookup :country_code 'country_lookup_table'  directive in a wrangler in  my datafusion pipeline but I'm getting Error encountered while executing 'table-lookup' : Dataset 'country_lookup_table' could not be instantiated. Make sure that a dataset 'country_lookup_table' of type Table exists.
country_code column is in the input in my wrangler and I have a lookup table in bigquery 'country_lookup_table'.
May I ask if I have omitted any steps, thank you

Thanks in advance

Mohamed Ali Bardi

unread,
Feb 18, 2022, 2:18:55 PM2/18/22
to CDAP User
Here's the stacktrace if it can help.

org.apache.spark.SparkException: Task failed while writing rows
        at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:162) ~[spark-core_2.12-3.1.2.jar:3.1.2]
        at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88) ~[spark-core_2.12-3.1.2.jar:3.1.2]
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.1.2.jar:3.1.2]
        at org.apache.spark.scheduler.Task.run(Task.scala:131) ~[spark-core_2.12-3.1.2.jar:3.1.2]
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) ~[spark-core_2.12-3.1.2.jar:3.1.2]
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) ~[spark-core_2.12-3.1.2.jar:na]
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) ~[spark-core_2.12-3.1.2.jar:3.1.2]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_312]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_312]
        at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_312]
Caused by: java.lang.Exception: Stage:myWrangler - Failing pipeline due to error : Lookup is not supported in Spark pipelines.
        at io.cdap.wrangler.Wrangler.transform(Wrangler.java:402) ~[%20artifact5641174554694186557.jar:na]
        at io.cdap.wrangler.Wrangler.transform(Wrangler.java:82) ~[%20artifact5641174554694186557.jar:na]
        at io.cdap.cdap.etl.common.plugin.WrappedTransform.lambda$transform$5(WrappedTransform.java:90) ~[cdap-etl-core-6.5.1.jar:na]
        at io.cdap.cdap.etl.common.plugin.Caller$1.call(Caller.java:30) ~[cdap-etl-core-6.5.1.jar:na]
        at io.cdap.cdap.etl.common.plugin.WrappedTransform.transform(WrappedTransform.java:89) ~[cdap-etl-core-6.5.1.jar:na]
        at io.cdap.cdap.etl.common.TrackedTransform.transform(TrackedTransform.java:74) ~[cdap-etl-core-6.5.1.jar:na]
        at io.cdap.cdap.etl.spark.function.TransformFunction.call(TransformFunction.java:54) ~[hydrator-spark-core2_2.11-6.5.1.jar:na]
        at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125) ~[spark-core_2.12-3.1.2.jar:3.1.2]
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) ~[scala-library-2.12.14.jar:na]
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) ~[scala-library-2.12.14.jar:na]
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) ~[scala-library-2.12.14.jar:na]
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) ~[scala-library-2.12.14.jar:na]
        at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:135) ~[spark-core_2.12-3.1.2.jar:3.1.2]
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473) ~[spark-core_2.12-3.1.2.jar:na]
        at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:134) ~[spark-core_2.12-3.1.2.jar:3.1.2]
        ... 9 common frames omitted
Caused by: java.lang.UnsupportedOperationException: Lookup is not supported in Spark pipelines.
        at io.cdap.cdap.etl.spark.NoLookupProvider.provide(NoLookupProvider.java:32) ~[hydrator-spark-core2_2.11-6.5.1.jar:na]
        at io.cdap.cdap.etl.common.AbstractTransformContext.provide(AbstractTransformContext.java:44) ~[cdap-etl-core-6.5.1.jar:na]
        at io.cdap.wrangler.WranglerPipelineContext.provide(WranglerPipelineContext.java:118) ~[%20artifact5641174554694186557.jar:na]
        at io.cdap.directives.lookup.TableLookup.ensureInitialized(TableLookup.java:85) ~[na:na]
        at io.cdap.directives.lookup.TableLookup.execute(TableLookup.java:101) ~[na:na]
        at io.cdap.directives.lookup.TableLookup.execute(TableLookup.java:47) ~[na:na]
        at io.cdap.wrangler.executor.RecipePipelineExecutor.execute(RecipePipelineExecutor.java:121) ~[wrangler-core-4.5.1.jar:na]
        at io.cdap.wrangler.executor.RecipePipelineExecutor.execute(RecipePipelineExecutor.java:90) ~[wrangler-core-4.5.1.jar:na]
        at io.cdap.wrangler.Wrangler.transform(Wrangler.java:377) ~[%20artifact5641174554694186557.jar:na

Thanks

Albert Shau

unread,
Feb 18, 2022, 5:37:47 PM2/18/22
to cdap...@googlegroups.com
Lookups are not supported on DataFusion. They are only supported when running CDAP on Hadoop with HBase and CDAP's transaction service.

In general, it is recommended to use a join instead of point lookups, as it has more predictable semantics (lookup table is read once instead of multiple times over the course of the run) and generally performs better. If your lookup table is small, be sure to configure the join to load it into memory for best performance.   

--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cdap-user/6292eb00-e2d7-4b1b-82c1-885b85ab4c89n%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages