Spark Validation Step - java.lang.UnsupportedOperationException: empty collection

855 views
Skip to first unread message

geouf...@googlemail.com

unread,
May 22, 2017, 6:34:57 AM5/22/17
to Kylo Community
Hi,

When running sample userdata on EMR 5.5 cluster I'm running into the below error. I have kylo installed on both an edge node as well as a core node (for testing purposes) and I get the same error. 

2017-05-22 10:23:39,913 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] 17/05/22 10:23:39 INFO FileInputFormat: Total input paths to process : 0

2017-05-22 10:23:39,928 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] 17/05/22 10:23:39 INFO SparkContext: Starting job: reduce at Validator.java:483

2017-05-22 10:23:39,931 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] 17/05/22 10:23:39 INFO DAGScheduler: Job 0 finished: reduce at Validator.java:483, took 0.002555 s

2017-05-22 10:23:39,939 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] 17/05/22 10:23:39 ERROR Validator: Failed to perform validation

2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] java.lang.UnsupportedOperationException: empty collection

2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$36.apply(RDD.scala:1027)

2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$36.apply(RDD.scala:1027)

2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at scala.Option.getOrElse(Option.scala:121)

2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1027)

2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)

2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at org.apache.spark.api.java.JavaRDDLike$class.reduce(JavaRDDLike.scala:385)

2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at org.apache.spark.api.java.AbstractJavaRDDLike.reduce(JavaRDDLike.scala:45)

2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at com.thinkbiganalytics.spark.datavalidator.Validator.cleansedRowResultsValidationCounts(Validator.java:483)

2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at com.thinkbiganalytics.spark.datavalidator.Validator.doValidate(Validator.java:231)

2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at com.thinkbiganalytics.spark.datavalidator.Validator.main(Validator.java:148)


Any idea what might be causing this?


Regards,

Geouffrey


Matt Hutton

unread,
May 22, 2017, 11:18:24 AM5/22/17
to Kylo Community
Geouffrey-

The standard ingest creates a few tables, can you verify there is data in your feed table for the latest partition that this error occurred on? If using the sandbox you can do something like this, substituting the category, feed, and processing timestamp used for the partition:

sudo su nifi
hive
show databases;
use <category_sysname>;
show tables;
select count(0) from <feed_sysname>_feed where processing_dttm = <???>;
select count(0) from <feed_sysname>_valid where processing_dttm = <???>;




geouf...@googlemail.com

unread,
May 22, 2017, 11:33:17 AM5/22/17
to Kylo Community
Hi Matt,

Thanks for the reply. I looked at those earlier and can confirm they were all empty. I wasn't sure when those tables are populated... Which Step in Nifi populates those tables?

Matt Hutton

unread,
May 22, 2017, 12:13:37 PM5/22/17
to Kylo Community
The standard ingest pipeline 1) moves the raw ingested file(s) to HDFS 2) creates an external Hive partition on the feed table against the file(s) location 2) Spark job queries the feed table, cleanses and validates the data. Valid records are written to the <feed_valid> table and invalid records are written to the <feed_invalid> table.

 If the feed table is completely empty something might have gone wrong with one of the initial two steps.  Make sure NiFi can write to HDFS and query hive. Nifi is at http://<host>:8079/nifi.  Also, you can try to ingest again and watch the logs at /var/log/nifi/nifi-app.log   Normally you would see a an error if any of these steps fail and the job would fail rather than moving on to the Spark step.  

geouf...@googlemail.com

unread,
May 23, 2017, 6:58:09 AM5/23/17
to Kylo Community
Hi Matt,

That would explain why I'm getting the error as my hive table is empty. I'm following the NiFi flow can see why it's empty but not sure why my external table is creating as expected. I have the following configuration (in brackets are the original values supplied with kylo:

# Root HDFS locations for new raw files

config.hdfs.ingest.root=kylo/etl    (/etl)

# Root HDFS location for Hive ingest processing tables (raw,valid,invalid)

config.hive.ingest.root=s3n://ydginsights/kylo/model.db       (/model.db)

# Root HDFS location for Hive profile table

config.hive.profile.root=s3n://ydginsights/kylo/model.db      (/model.db)

# Root HDFS location for Hive master table

config.hive.master.root=s3n://ydginsights/kylo/app/warehouse  (/app/warehouse)


I've disabled Register HDFS Folders (CreateHDFSFolder Processor)


I've replaced PutHDFS with PutS3Object Processors

The Upload To HDFS Processor writes data to:

s3n://bucketname/${hdfs.ingest.root}/${category}/${feed}/${feedts}/${filename}


and I can see data in s3n://bucketname/kylo/etl/${category}/${feed}/${feedts}/ as expected from above configuration


When I look at hive, I can see tables are created as follow:

user_signups_a >          s3n://ydginsights/kylo/app/warehouse/website_a/user_signups_a

user_signups_a_feed >     s3n://ydginsights/kylo/model.db/website_a/user_signups_a/feed - EXTERNAL

user_signups_a_invalid >  s3n://ydginsights/kylo/model.db/website_a/user_signups_a/invalid

user_signups_a_profile >  s3n://ydginsights/kylo/model.db/website_a/user_signups_a/profile

user_signups_a_valid >    s3n://ydginsights/kylo/model.db/website_a/user_signups_a/valid

This is the CREATE statement from nifi-app.log:

CREATE EXTERNAL TABLE IF NOT EXISTS `website_a`.`user_signups_a_feed` (`registration_dttm` string, `id` string, `first_name` string, `last_name` string, `email` string, `gender` string, `ip_address` string, `cc` string, `country` string, `birthdate` string, `salary` string, `title` string, `comments` string)   PARTITIONED BY (`processing_dttm` string)  ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'

 WITH SERDEPROPERTIES ( 'separatorChar' = ',' ,'escapeChar' = '\\' ,'quoteChar' = '\"') STORED AS TEXTFILE LOCATION 's3n://ydginsights/kylo/model.db/website_a/user_signups_a/feed


So my question.... Is there a different process writing data to hive.ingest.root so that the external table user_signups_a_feed can create over it or is the CREATE STATEMENT using incorrect properties? I'm using kylo 0.8.0.1


Thanks,

Geouffrey





geouf...@googlemail.com

unread,
May 23, 2017, 11:33:31 AM5/23/17
to Kylo Community
I was looking for a step in the wrong place... it's the next step where it alters the table and adds the partition where my problem was. 

I've changed:
config.hdfs.ingest.root=s3n://ydginsights/kylo/etl 

And made a few changes to my S3PutObject processor


geouf...@googlemail.com

unread,
May 23, 2017, 1:34:12 PM5/23/17
to Kylo Community
Hi Matt,

The Validate and Split Spark job run successfully but none of the other tables are being populated. I can see data in the _feed table but _valid, _invalid, _profile are all empty. Does the spark job INSERT data into the hive tables directly?

Thanks,
Geouffrey

Jagrut Sharma

unread,
May 23, 2017, 2:00:01 PM5/23/17
to Kylo Community
Hi Geouffrey - Yes, the Validate and Split spark job will populate the _valid, _invalid and _profile tables. The Profile step will further populate the _profile table with additional data.

Thanks.
--
Jagrut

geouf...@googlemail.com

unread,
May 23, 2017, 2:07:47 PM5/23/17
to Kylo Community
Hi Jagrut,

That's what I thought but this does not seem to work at the moment. Any idea what the cause could be or where I can look to track it down. There are no errors in nifi, kylo-service or kylo-spark log files.

My hive tables are defined and stored on S3 as well


nifi-app.log

Jagrut Sharma

unread,
May 23, 2017, 2:49:59 PM5/23/17
to Kylo Community
Hi Geouffrey - Looking at the log, the validator is able to read from _feed table and identify valid and invalid records. Writing to the S3-backed table is probably facing some issue, and hence the tables are empty. Check if Spark is able to write back to S3 tables via spark-shell. If you still face issues, please log a JIRA with supporting details at https://kylo-io.atlassian.net and we can take a deeper look there.

Thanks.
--
Jagrut

geouf...@googlemail.com

unread,
May 24, 2017, 5:50:44 AM5/24/17
to Kylo Community
All done, thanks Jagrut. 

Please see KYLO-721 for more detail

Matt Hutton

unread,
May 24, 2017, 11:49:13 AM5/24/17
to Kylo Community
Geouffrey-

Can you verify you have these Hive settings?

On the edge node, edit the file: /usr/hdp/current/spark-client/conf/spark-defaults.conf
Add these configuration entries to the file:

spark.sql.hive.convertMetastoreOrc false
spark.sql.hive.convertMetastoreParquet false

You may need to restart Hive if changed..

geouf...@googlemail.com

unread,
May 24, 2017, 12:36:34 PM5/24/17
to Kylo Community
Hi Matt,

Thanks for your response. Yes that worked for the Validate and Split step. I can see data in _valid, _invalid and _profile now. I'm on the next step (Merge Step) where it's failing now... so let me dig in there.

The next error, in case you've already seen this:

TaskAttempt 3 failed, info=[Error: Error while running task ( failure ) : attempt_1495525536754_0035_2_03_000000_3:java.lang.RuntimeException: java.lang.RuntimeException: Hive Runtime Error while closing operators: bound must be positive

at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:211)

at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:168)

at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:370)

at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:73)

at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:61)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)

at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:61)

at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:37)

at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.RuntimeException: Hive Runtime Error while closing operators: bound must be positive

at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor.close(ReduceRecordProcessor.java:359)

at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:199)

... 14 more

Caused by: java.lang.IllegalArgumentException: bound must be positive

at java.util.Random.nextInt(Random.java:388)

at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:305)

at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)

at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)

at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)

at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87)

at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:421)

at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:915)

at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:896)

at org.apache.orc.impl.WriterImpl.getStream(WriterImpl.java:2468)

at org.apache.orc.impl.WriterImpl.flushStripe(WriterImpl.java:2485)

at org.apache.orc.impl.WriterImpl.close(WriterImpl.java:2787)

at org.apache.hadoop.hive.ql.io.orc.WriterImpl.close(WriterImpl.java:313)

at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:120)

at org.apache.hadoop.hive.ql.exec.FileSinkOperator$FSPaths.closeWriters(FileSinkOperator.java:190)

at org.apache.hadoop.hive.ql.exec.FileSinkOperator.closeOp(FileSinkOperator.java:1036)

at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:683)

at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:697)

at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor.close(ReduceRecordProcessor.java:335)

... 15 more

]], Vertex did not succeed due to OWN_TASK_FAILURE, failedTasks:1 killedTasks:0, Vertex vertex_1495525536754_0035_2_03 [Reducer 3] killed/failed due to:OWN_TASK_FAILURE]DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:0

at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:296) ~[na:na]

at org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264) ~[na:na]

at org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264) ~[na:na]

at com.thinkbiganalytics.ingest.TableMergeSyncSupport.doExecuteSQL(TableMergeSyncSupport.java:708) ~[kylo-nifi-core-processors-0.8.0.1.jar:na]

... 14 common frames omitted

Matt Hutton

unread,
May 24, 2017, 12:51:49 PM5/24/17
to Kylo Community
Also, take a look to see if records were written to  the _feed table but _valid, _invalid, _profile are all empty

geouf...@googlemail.com

unread,
May 24, 2017, 12:59:46 PM5/24/17
to Kylo Community
Hi Matt,

That last message confused me a bit or perhaps I don't understand what you mean...

My _feed table contains data. These are partitions pointing to my data in hdfs.ingest.root
_valid, _invalid and _profile all contain data after I made the previous change you mentioned.

I guess this is the expected state of the tables after the Validate and Split?

Matt Hutton

unread,
May 24, 2017, 1:36:40 PM5/24/17
to Kylo Community
Can you verify that you can query the data for the _valid via Hive?

select count(0) from <feed>_valid;
select *  from <feed>_valid;

Note: You can do this from Kylo in the tables tab. You can open the table for the feed, query tab, then modify the query as shown above

geouf...@googlemail.com

unread,
May 24, 2017, 2:34:20 PM5/24/17
to Kylo Community
Hi Matt,

Yes I can confirm that data is showing in _valid. That processor succeeds now.

Slowly I'm getting there...

It's in the next step where I'm seeing the latest mention error.


Here is a copy of the error message again:

at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:211)

at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:168)

at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:370)

at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:73)

at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:61)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)

at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:61)

at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:37)

at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor.close(ReduceRecordProcessor.java:359)

at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:199)

... 14 more

at java.util.Random.nextInt(Random.java:388)

at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:305)

at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)

at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)

at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)

at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:421)

at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:915)

at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:896)

at org.apache.orc.impl.WriterImpl.getStream(WriterImpl.java:2468)

at org.apache.orc.impl.WriterImpl.flushStripe(WriterImpl.java:2485)

at org.apache.orc.impl.WriterImpl.close(WriterImpl.java:2787)

at org.apache.hadoop.hive.ql.io.orc.WriterImpl.close(WriterImpl.java:313)

at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:120)

at org.apache.hadoop.hive.ql.exec.FileSinkOperator$FSPaths.closeWriters(FileSinkOperator.java:190)

at org.apache.hadoop.hive.ql.exec.FileSinkOperator.closeOp(FileSinkOperator.java:1036)

at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:683)

at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:697)

at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor.close(ReduceRecordProcessor.java:335)

... 15 more

Regards,
Geouffrey

Greg Hart

unread,
May 26, 2017, 1:11:03 PM5/26/17
to Kylo Community
Hi Geouffrey,

This looks like a problem with the S3A filesystem and not with Kylo. Please try the Apache Hadoop mailing lists or contact Think Big for additional support options.

geouf...@googlemail.com

unread,
May 31, 2017, 11:56:57 AM5/31/17
to Kylo Community
In case anyone else runs into the same problem...

I added the following property to core-site.xml

<property>

  <name>fs.s3a.fast.upload</name>

  <value>true</value>

</property>


I also added the following to hive-site.xml

  <property>

    <name>hive.exec.dynamic.partition</name>

    <value>true</value>

  </property>


  <property>

    <name>hive.exec.dynamic.partition.mode</name>

    <value>nonstrict</value>

  </property>


I'm using s3a:// for everything at the moment.


Thanks for your help. I finally have a full end to end sample flow working using Kylo and S3.



Matt Hutton

unread,
May 31, 2017, 11:58:30 AM5/31/17
to Kylo Community
Thank you for contributing your solution!
Reply all
Reply to author
Forward
0 new messages