2017-05-22 10:23:39,913 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] 17/05/22 10:23:39 INFO FileInputFormat: Total input paths to process : 0
2017-05-22 10:23:39,928 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] 17/05/22 10:23:39 INFO SparkContext: Starting job: reduce at Validator.java:483
2017-05-22 10:23:39,931 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] 17/05/22 10:23:39 INFO DAGScheduler: Job 0 finished: reduce at Validator.java:483, took 0.002555 s
2017-05-22 10:23:39,939 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] 17/05/22 10:23:39 ERROR Validator: Failed to perform validation
2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] java.lang.UnsupportedOperationException: empty collection
2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$36.apply(RDD.scala:1027)
2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$36.apply(RDD.scala:1027)
2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at scala.Option.getOrElse(Option.scala:121)
2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1027)
2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at org.apache.spark.api.java.JavaRDDLike$class.reduce(JavaRDDLike.scala:385)
2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at org.apache.spark.api.java.AbstractJavaRDDLike.reduce(JavaRDDLike.scala:45)
2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at com.thinkbiganalytics.spark.datavalidator.Validator.cleansedRowResultsValidationCounts(Validator.java:483)
2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at com.thinkbiganalytics.spark.datavalidator.Validator.doValidate(Validator.java:231)
2017-05-22 10:23:39,940 INFO [stream error] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=2b1f1ef0-4e06-1bd6-cb0a-69225893eb60] at com.thinkbiganalytics.spark.datavalidator.Validator.main(Validator.java:148)
Any idea what might be causing this?
Regards,
Geouffrey
# Root HDFS locations for new raw files
config.hdfs.ingest.root=kylo/etl (/etl)
# Root HDFS location for Hive ingest processing tables (raw,valid,invalid)
config.hive.ingest.root=s3n://ydginsights/kylo/model.db (/model.db)
# Root HDFS location for Hive profile table
config.hive.profile.root=s3n://ydginsights/kylo/model.db (/model.db)
# Root HDFS location for Hive master table
config.hive.master.root=s3n://ydginsights/kylo/app/warehouse (/app/warehouse)
I've disabled Register HDFS Folders (CreateHDFSFolder Processor)
I've replaced PutHDFS with PutS3Object Processors
The Upload To HDFS Processor writes data to:
s3n://bucketname/${hdfs.ingest.root}/${category}/${feed}/${feedts}/${filename}
and I can see data in s3n://bucketname/kylo/etl/${category}/${feed}/${feedts}/ as expected from above configuration
When I look at hive, I can see tables are created as follow:
user_signups_a > s3n://ydginsights/kylo/app/warehouse/website_a/user_signups_a
user_signups_a_feed > s3n://ydginsights/kylo/model.db/website_a/user_signups_a/feed - EXTERNAL
user_signups_a_invalid > s3n://ydginsights/kylo/model.db/website_a/user_signups_a/invalid
user_signups_a_profile > s3n://ydginsights/kylo/model.db/website_a/user_signups_a/profile
user_signups_a_valid > s3n://ydginsights/kylo/model.db/website_a/user_signups_a/valid
This is the CREATE statement from nifi-app.log:
CREATE EXTERNAL TABLE IF NOT EXISTS `website_a`.`user_signups_a_feed` (`registration_dttm` string, `id` string, `first_name` string, `last_name` string, `email` string, `gender` string, `ip_address` string, `cc` string, `country` string, `birthdate` string, `salary` string, `title` string, `comments` string) PARTITIONED BY (`processing_dttm` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES ( 'separatorChar' = ',' ,'escapeChar' = '\\' ,'quoteChar' = '\"') STORED AS TEXTFILE LOCATION 's3n://ydginsights/kylo/model.db/website_a/user_signups_a/feed
So my question.... Is there a different process writing data to hive.ingest.root so that the external table user_signups_a_feed can create over it or is the CREATE STATEMENT using incorrect properties? I'm using kylo 0.8.0.1
Thanks,
Geouffrey
TaskAttempt 3 failed, info=[Error: Error while running task ( failure ) : attempt_1495525536754_0035_2_03_000000_3:java.lang.RuntimeException: java.lang.RuntimeException: Hive Runtime Error while closing operators: bound must be positive
at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:211)
at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:168)
at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:370)
at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:73)
at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:61)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:61)
at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:37)
at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Hive Runtime Error while closing operators: bound must be positive
at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor.close(ReduceRecordProcessor.java:359)
at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:199)
... 14 more
Caused by: java.lang.IllegalArgumentException: bound must be positive
at java.util.Random.nextInt(Random.java:388)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:305)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
at org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:421)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:915)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:896)
at org.apache.orc.impl.WriterImpl.getStream(WriterImpl.java:2468)
at org.apache.orc.impl.WriterImpl.flushStripe(WriterImpl.java:2485)
at org.apache.orc.impl.WriterImpl.close(WriterImpl.java:2787)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.close(WriterImpl.java:313)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:120)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator$FSPaths.closeWriters(FileSinkOperator.java:190)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.closeOp(FileSinkOperator.java:1036)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:683)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:697)
at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor.close(ReduceRecordProcessor.java:335)
... 15 more
]], Vertex did not succeed due to OWN_TASK_FAILURE, failedTasks:1 killedTasks:0, Vertex vertex_1495525536754_0035_2_03 [Reducer 3] killed/failed due to:OWN_TASK_FAILURE]DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:0
at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:296) ~[na:na]
at org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264) ~[na:na]
at org.apache.commons.dbcp.DelegatingStatement.execute(DelegatingStatement.java:264) ~[na:na]
at com.thinkbiganalytics.ingest.TableMergeSyncSupport.doExecuteSQL(TableMergeSyncSupport.java:708) ~[kylo-nifi-core-processors-0.8.0.1.jar:na]
... 14 common frames omitted
Yes I can confirm that data is showing in _valid. That processor succeeds now.
Slowly I'm getting there...
It's in the next step where I'm seeing the latest mention error.
Here is a copy of the error message again:
at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:211)
at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:168)
at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:370)
at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:73)
at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:61)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:61)
at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:37)
at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor.close(ReduceRecordProcessor.java:359)
at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:199)
... 14 more
at java.util.Random.nextInt(Random.java:388)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:305)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:421)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:915)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:896)
at org.apache.orc.impl.WriterImpl.getStream(WriterImpl.java:2468)
at org.apache.orc.impl.WriterImpl.flushStripe(WriterImpl.java:2485)
at org.apache.orc.impl.WriterImpl.close(WriterImpl.java:2787)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.close(WriterImpl.java:313)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.close(OrcOutputFormat.java:120)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator$FSPaths.closeWriters(FileSinkOperator.java:190)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.closeOp(FileSinkOperator.java:1036)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:683)
at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:697)
at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor.close(ReduceRecordProcessor.java:335)
... 15 more
Regards,
Geouffrey
<property>
<name>fs.s3a.fast.upload</name>
<value>true</value>
</property>
I also added the following to hive-site.xml
<property>
<name>hive.exec.dynamic.partition</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
I'm using s3a:// for everything at the moment.
Thanks for your help. I finally have a full end to end sample flow working using Kylo and S3.