Error using "multi file to gcs sink"

65 views
Skip to first unread message

virgilio pierini

unread,
Apr 9, 2020, 8:04:11 AM4/9/20
to CDAP User
Hi all
I'm trying to use "multi file to gcs sink" to write data to different directories based on a specific property.
I'm NOT reading from a multi table, I'm just calculating a filed called "destination" that I want to use instead of "tablename".

When I run it, in a spark streaming pipeline, it asks to a parameter called "__provided_schema__" which is something I don't know and it fails with the following error

2020-04-08 22:37:32,063 - ERROR [streaming-job-executor-0:i.c.c.e.s.s.f.StreamingBatchSinkFunction@112] - Error writing to sink GCS Multi File for the batch for time 1586385430000.
java.lang.IllegalArgumentException: GCS Multi File has no outputs. Please check that the sink calls addOutput at some point.
at io.cdap.cdap.etl.spark.batch.SparkBatchSinkFactory.writeFromRDD(SparkBatchSinkFactory.java:85) ~[hydrator-spark-core2_2.11-6.1.1.jar:na]
at io.cdap.cdap.etl.spark.streaming.function.StreamingBatchSinkFunction.call(StreamingBatchSinkFunction.java:100) [hydrator-spark-core2_2.11-6.1.1.jar:na]
at io.cdap.cdap.etl.spark.streaming.function.StreamingBatchSinkFunction.call(StreamingBatchSinkFunction.java:51) [hydrator-spark-core2_2.11-6.1.1.jar:na]
at io.cdap.cdap.etl.spark.Compat$1.call(Compat.java:65) [hydrator-spark-core2_2.11-6.1.1.jar:na]
at io.cdap.cdap.etl.spark.Compat$1.call(Compat.java:62) [hydrator-spark-core2_2.11-6.1.1.jar:na]
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:280) [spark-streaming_2.11-2.3.4.jar:2.3.4]
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:280) [spark-streaming_2.11-2.3.4.jar:2.3.4]
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) [spark-streaming_2.11-2.3.4.jar:2.3.4]
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) [spark-streaming_2.11-2.3.4.jar:2.3.4]
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) [spark-streaming_2.11-2.3.4.jar:2.3.4]
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) [spark-streaming_2.11-2.3.4.jar:2.3.4]
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) [spark-streaming_2.11-2.3.4.jar:2.3.4]
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) [spark-streaming_2.11-2.3.4.jar:2.3.4]
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) [spark-streaming_2.11-2.3.4.jar:2.3.4]
at scala.util.Try$.apply(Try.scala:192) [scala-library-2.11.8.jar:na]
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) [spark-streaming_2.11-2.3.4.jar:2.3.4]
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) [spark-streaming_2.11-2.3.4.jar:2.3.4]
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) [spark-streaming_2.11-2.3.4.jar:2.3.4]
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) [spark-streaming_2.11-2.3.4.jar:2.3.4]
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) [scala-library-2.11.8.jar:na]
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) [spark-streaming_2.11-2.3.4.jar:2.3.4]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_242]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_242]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_242]
2020-04-08 22:37:32,080 - DEBUG [spark-listener-group-shared:i.c.c.a.r.s.AbstractSparkExecutionContext@154] - Spark program=program:default.test_avro_pubsub.-SNAPSHOT.spark.DataStreamsSparkStreaming, runId=9f5c6eb4-79e8-11ea-b920-a2fea9e6d990, jobId=3 starts without transaction
2020-04-08 22:37:32,080 - DEBUG [spark-listener-group-shared:i.c.c.a.r.s.SparkTransactionHandler@110] - Spark job started: JobTransaction{jobId=3, stageIds=[4], transaction=null}
2020-04-08 22:37:32,125 - DEBUG [spark-listener-group-shared:i.c.c.a.r.s.SparkTransactionHandler@144] - Spark job ended: JobTransaction{jobId=3, stageIds=[4], transaction=null}



Any help is appreciated
My regards
Virgilio


Albert Shau

unread,
Apr 9, 2020, 1:30:55 PM4/9/20
to cdap...@googlegroups.com
Hi Virgilio,

The multi sink plugins assume they are being used in conjunction with a multi source, which will set a runtime argument for each "table" containing the schema for that "table" (see source and sink).

Arguments keys are of the form "multisink.[tablename]", with the value being the schema. If you're not using the source, you'll have to set these yourself at the start of the run.

Regards,
Albert

--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cdap-user/1afebca8-326e-47e5-b4be-d804194cdb98%40googlegroups.com.

virgilio pierini

unread,
Apr 9, 2020, 5:51:59 PM4/9/20
to CDAP User
Ok, got the idea
thank you


Il giorno giovedì 9 aprile 2020 19:30:55 UTC+2, Albert Shau ha scritto:
Hi Virgilio,

The multi sink plugins assume they are being used in conjunction with a multi source, which will set a runtime argument for each "table" containing the schema for that "table" (see source and sink).

Arguments keys are of the form "multisink.[tablename]", with the value being the schema. If you're not using the source, you'll have to set these yourself at the start of the run.

Regards,
Albert

To unsubscribe from this group and stop receiving emails from it, send an email to cdap...@googlegroups.com.

virgilio pierini

unread,
Apr 9, 2020, 6:20:55 PM4/9/20
to CDAP User
mmm
sorry to bother again but I was thinking:
wouldn't it be valuable to store data in different folder even if it doesn't come from a database?
All my journey started with a kafka topic where some messages need special processing and a different landing area. I could split data if the number of areas were know in advance, but you know, flexible extendible systems tend to be a bit too parametric. So idea was: let's write in a folder whose name is after a field.

Given the fact that I have a javascript step which creates a field "destination_folder" based on some calculations, are you aware of a way to set "multisink.*" arguments key in the javascript and which format is expected? Eg: column1 string, column2 long

Virgilio
Reply all
Reply to author
Forward
0 new messages