XML Ingest Feed with Data Transformation - No encryption/decryption key has been configured error on EMR Nodes

89 views
Skip to first unread message

Ben S

unread,
Apr 17, 2019, 9:54:42 AM4/17/19
to Kylo Community
Hello,

I'm having an issue when using Data Transformation reusable with the XML Ingest feed. My Kylo environment is in AWS and consists of a 3 node NiFi cluster, an AWS EMR cluster (1 Master and 2 Worker Node), and then a kylo instance. It fails at 'Execute Script' in Data Transformation. I've attached the NiFi log and the stderr from each of the EMR worker nodes. I get an error 13 on the NiFi node. On the EMR worker nodes I get a 'No encryption/decryption key has been configured error on EMR Nodes' error.

I have the NiFi nodes configured to use the encrypt.key from kylo, as described in https://kylo.readthedocs.io/en/v0.10.0/installation/ManualDeploymentGuide.html#configure-the-ext-config-folder.

Any help would be appreciated.

Thanks,
Ben
nifi-app.log
Node1StdErrLog.txt
Node2StdErrLog.txt

Ben S

unread,
Apr 17, 2019, 11:32:44 AM4/17/19
to Kylo Community

Ben S

unread,
Apr 22, 2019, 9:03:26 AM4/22/19
to Kylo Community
Also, I have verified that NiFi is loading the ENCRYPT_KEY by having an UpdateAttributes processor add the attribute '${ENCRYPT_KEY}'
Message has been deleted

Ben S

unread,
Apr 24, 2019, 5:22:51 AM4/24/19
to Kylo Community
I have identified the solution. You need to add the ENCRYPT_KEY as a variable in spark-defaults.conf on the instance(s) running NiFi:

spark.yarn.appMasterEnv.ENCRYPT_KEY $(< /your/path/encrypt.key)

Ben S

unread,
Apr 28, 2019, 2:21:43 PM4/28/19
to Kylo Community
Actually, it turns out this was more of a bandaid. It appears that SparkLauncher(env) in ExecuteSparkJob.java:518 does not pass the env to the spark executors, which causes this problem. Seems like a bug. I'm gonna get a full write up of my steps and post a bug.

Ben S

unread,
Apr 29, 2019, 5:46:23 AM4/29/19
to Kylo Community
I have created reproducible steps for converting the Kylo Sandbox AMI (kylo-hdp-sandbox-0.10.0 (ami-045f04dfd883a1c8d)) to use the EMR and reproduce the problem.


# Verify Data Transform Works on The Sandbox

1) Launch the Kylo Sandbox AMI
* Make sure you can SSH into this instances.
* Make sure this instance is in the 'default' security group so that it can communicate with the EMR instances.
2) Wait for Ambari to fully load all services before continuing.
* <KYLO_PUBLIC_DNS>:8080

3) Create a Data Transformation Feed
* Using my sample data, demostatsbyzip_s.xml, transformation can be done by:
** Flatten
** Explode
** Flatten
* All other feed properties can be set to defaults.

4) Run the feed. Expect success.




# Convert the Sandbox to use an EMR Cluster
2) Create an EFS, note the file system id.
** fs-c9eb96b0
* Make sure this is in the 'default' security group so that it can mount to all instances.
3) Launch an EMR Cluster
1) Go to Advanced Options
2) Release: emr-5.15.0
3) Hadoop, Hive, Hue, Spark, Tez, HCatalog, Pig
4) Uniform instace groups (default)
5) Pick an EC2 Key Pair so you can SSH into the instance.
6) Set the secuirty groups so you can SSH into the instance.
7) Add the default security group to all instances so Kylo can communicate with them.
8) Launch the Cluster
9) Once the cluster is launched, rename the master and nodes to differentiate them.

4) Wait for Ambari to fully load and load all services. Once all services are loaded, stop all services. 
* <KYLO_PUBLIC_DNS>:8080

5) Mount the EFS on the Kylo Instance

sudo su

yum -y install git
yum -y install make
cd efs-utils
yum -y install rpm-build
make rpm
yum -y --nogpgcheck install ./build/amazon-efs-utils*rpm

mkdir /mnt/efs
echo "<YOUR EFS ID>:/ /mnt/efs efs defaults,_netdev 0 0" >> /etc/fstab
mount -a

6) Mount the EFS on EMR Master and Both EMR Nodes

sudo su

yum install -y amazon-efs-utils

mkdir /mnt/efs
echo "<YOUR EFS ID>:/ /mnt/efs efs defaults,_netdev 0 0" >> /etc/fstab
mount -a

7) Kylo - Prepare Supergroup and Update Users

groupadd -g 2001 supergroup

usermod -g supergroup nifi
usermod -g supergroup kylo

sudo -u kylo echo 'umask 002' >> /home/kylo/.bash_profile

8) EMR Master - Prepare Supergroup and Users

groupadd -g 2001 supergroup

useradd -g 2001 -r -m -s /bin/bash nifi
useradd -g 2001 -r -m -s /bin/bash kylo

usermod -a -G supergroup root
usermod -a -G supergroup hadoop

su - hdfs
hdfs dfs -mkdir /user/kylo
hdfs dfs -chown kylo:supergroup /user/kylo
hdfs dfs -mkdir /user/nifi
hdfs dfs -chown nifi:supergroup /user/nifi
hdfs dfs -ls /user
exit

9) Update Kylo for Uploads

echo "catalog.uploads.group=supergroup" >> /opt/kylo/kylo-services/conf/application.properties
echo "catalog.uploads.permission=775" >> /opt/kylo/kylo-services/conf/application.properties

10) Update EMR Master for Uploads

sed -i 's/<value>hadoop<\/value>/<value>supergroup<\/value>/g' /etc/hadoop/conf.empty/hdfs-site.xml

stop hadoop-hdfs-namenode
start hadoop-hdfs-namenode

11) EMR Master - Disable Lzo Codec

sed -i 's/,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec//g' /etc/hadoop/conf.empty/core-site.xml

12) Prepare to Copy EMR Libraries to Edge Node

# Kylo Instance:

ssh-keygen -t rsa -b 2048 -f /root/.ssh/id_rsa -q -P ""
cp /root/.ssh/id_rsa.pub /mnt/efs

# EMR Master:

cp /mnt/efs/id_rsa.pub /home/hadoop/.ssh/
cat /home/hadoop/.ssh/id_rsa.pub >> /home/hadoop/.ssh/authorized_keys
chown hadoop:hadoop /home/hadoop/.ssh/id_rsa.pub

14) Copy EMR libraries to Edge Node

export MASTER_PRIVATE_IP=<EMR_MASTER_PRIVATE_IP>
export PEM_FILE=/root/.ssh/id_rsa

mkdir -p /usr/lib/spark
mkdir -p /usr/lib/hive-webhcat/share/hcatalog

echo 'export SPARK_HOME=/usr/lib/spark' >> /etc/profile.d/spark.sh
echo 'export PATH=$SPARK_HOME/bin:$PATH' >> /etc/profile.d/spark.sh
echo 'export HADOOP_CONF_DIR=/etc/hadoop/conf' >> /etc/profile.d/spark.sh
echo 'export SPARK_CONF_DIR=/etc/spark/conf' >> /etc/profile.d/spark.sh

source /etc/profile.d/spark.sh

unlink /etc/hadoop/conf
mkdir -p /etc/hadoop/conf
chown -R kylo:kylo /etc/hadoop/conf

unlink /etc/spark/conf
mkdir -p /etc/spark/conf
chown -R kylo:kylo /etc/spark/conf

mkdir -p /usr/share/aws /usr/lib/sqoop /usr/lib/hadoop-yarn /usr/lib/hadoop-mapreduce /usr/lib/hadoop-hdfs /usr/lib/hadoop
chown kylo:kylo /usr/share/aws /usr/lib/sqoop /usr/lib/hadoop-yarn /usr/lib/hadoop-mapreduce /usr/lib/hadoop-hdfs /usr/lib/hadoop

scp -o StrictHostKeyChecking=no -i $PEM_FILE hadoop@$MASTER_PRIVATE_IP:/etc/hadoop/conf/core-site.xml /etc/hadoop/conf
scp -o StrictHostKeyChecking=no -i $PEM_FILE hadoop@$MASTER_PRIVATE_IP:/etc/hadoop/conf/yarn-site.xml /etc/hadoop/conf
scp -o StrictHostKeyChecking=no -i $PEM_FILE hadoop@$MASTER_PRIVATE_IP:/etc/hadoop/conf/hdfs-site.xml /etc/hadoop/conf
scp -o StrictHostKeyChecking=no -i $PEM_FILE hadoop@$MASTER_PRIVATE_IP:/etc/hadoop/conf/mapred-site.xml /etc/hadoop/conf

rsync -avz --delete -e "ssh -o StrictHostKeyChecking=no -o ServerAliveInterval=10 -i $PEM_FILE" hadoop@$MASTER_PRIVATE_IP:'/usr/lib/spark/*' /usr/lib/spark
rsync -avz --delete -e "ssh -o StrictHostKeyChecking=no -o ServerAliveInterval=10 -i $PEM_FILE" hadoop@$MASTER_PRIVATE_IP:'/usr/lib/sqoop/*' /usr/lib/sqoop
rsync -avz --delete -e "ssh -o StrictHostKeyChecking=no -o ServerAliveInterval=10 -i $PEM_FILE" hadoop@$MASTER_PRIVATE_IP:'/usr/lib/hadoop/*' /usr/lib/hadoop
rsync -avz --delete -e "ssh -o StrictHostKeyChecking=no -o ServerAliveInterval=10 -i $PEM_FILE" hadoop@$MASTER_PRIVATE_IP:'/usr/lib/hadoop-yarn/*' /usr/lib/hadoop-yarn
rsync -avz --delete -e "ssh -o StrictHostKeyChecking=no -o ServerAliveInterval=10 -i $PEM_FILE" hadoop@$MASTER_PRIVATE_IP:'/usr/lib/hadoop-mapreduce/*' /usr/lib/hadoop-mapreduce
rsync -avz --delete -e "ssh -o StrictHostKeyChecking=no -o ServerAliveInterval=10 -i $PEM_FILE" hadoop@$MASTER_PRIVATE_IP:'/usr/lib/hadoop-hdfs/*' /usr/lib/hadoop-hdfs
rsync -avz --delete -e "ssh -o StrictHostKeyChecking=no -o ServerAliveInterval=10 -i $PEM_FILE" hadoop@$MASTER_PRIVATE_IP:'/usr/share/aws/*' /usr/share/aws

rsync -avz --delete -e "ssh -o StrictHostKeyChecking=no -o ServerAliveInterval=10 -i $PEM_FILE" hadoop@$MASTER_PRIVATE_IP:'/etc/spark/conf/*' /etc/spark/conf

echo "spark.hadoop.yarn.timeline-service.enabled false" >> /etc/spark/conf/spark-defaults.conf

scp -o StrictHostKeyChecking=no -o ServerAliveInterval=10 -i $PEM_FILE hadoop@$MASTER_PRIVATE_IP:/usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core-2.*-amzn-*.jar /usr/lib/hive-webhcat/share/hcatalog/hive-hcatalog-core.jar

15) Test Spark

spark-submit --class org.apache.spark.examples.SparkPi --master yarn --driver-memory 512m --executor-memory 512m --executor-cores 1 /usr/lib/spark/examples/jars/spark-examples_2.11-2.3.0.jar 10

# Check the Yarn UI to verify it was successful
http://<MASTER_NODE_PUBLIC_DNS>:8088/cluster

16) Get EMR Master PW from EMR Master

cat /etc/hive/conf/hive-site.xml | grep -B 5 -A 5 Password

17) Update Kylo Configuration

HIVE_PW=<PASSWORD ABOVE>
EMR_MASTER_DNS=<EMR MASTER PRIVATE DNS>
KYLO_EDGE_DNS=<KYLO PRIVATE DNS>

sed -i 's[^hive.datasource.url.*$[hive.datasource.url=jdbc:hive2://'"$EMR_MASTER_DNS"':10000/default[g' /opt/kylo/kylo-services/conf/application.properties
sed -i 's/^hive.datasource.username.*$/hive.datasource.username=hive/g' /opt/kylo/kylo-services/conf/application.properties
sed -i 's[^hive.metastore.datasource.url.*$[hive.metastore.datasource.url=jdbc:mysql://'"$EMR_MASTER_DNS"':3306/hive[g' /opt/kylo/kylo-services/conf/application.properties
sed -i 's[^hive.metastore.datasource.username.*$[hive.metastore.datasource.username=hive[g' /opt/kylo/kylo-services/conf/application.properties
sed -i 's[^hive.metastore.datasource.password.*$[hive.metastore.datasource.password='"$HIVE_PW"'[g' /opt/kylo/kylo-services/conf/application.properties

sed -i 's[^nifi.service.hive_thrift_service.database_connection_url.*$[nifi.service.hive_thrift_service.database_connection_url=jdbc:hive2://'"$EMR_MASTER_DNS"':10000/default[g' /opt/kylo/kylo-services/conf/application.properties

sed -i 's[^nifi.executesparkjob.sparkhome.*$[nifi.executesparkjob.sparkhome=/usr/lib/spark[g' /opt/kylo/kylo-services/conf/application.properties
sed -i 's[^nifi.executesparkjob.sparkmaster.*$[nifi.executesparkjob.sparkmaster=yarn-cluster[g' /opt/kylo/kylo-services/conf/application.properties
sed -i 's[^config.spark.validateAndSplitRecords.extraJars.*$[config.spark.validateAndSplitRecords.extraJars=/usr/lib/hive-webhcat/share/hcatalog/hive-hcatalog-core.jar,/usr/lib/spark/jars/datanucleus-api-jdo-3.2.6.jar,/usr/lib/spark/jars/datanucleus-core-3.2.10.jar,/usr/lib/spark/jars/datanucleus-rdbms-3.2.9.jar[g' /opt/kylo/kylo-services/conf/application.properties

echo "nifi.executesparkjob.extra_jars=/usr/lib/spark/jars/datanucleus-api-jdo-3.2.6.jar,/usr/lib/spark/jars/datanucleus-core-3.2.10.jar,/usr/lib/spark/jars/datanucleus-rdbms-3.2.9.jar" >> /opt/kylo/kylo-services/conf/application.properties
echo 'nifi.executesparkjob.extra_files=$nifi{table_field_policy_json_file},/etc/spark/conf/hive-site.xml' >> /opt/kylo/kylo-services/conf/application.properties
echo "config.spark.version=2" >> /opt/kylo/kylo-services/conf/application.properties

echo "spark.shell.deployMode = cluster" >> /opt/kylo/kylo-services/conf/spark.properties
sed -i 's/^spark.shell.master.*$/spark.shell.master=yarn/g' /opt/kylo/kylo-services/conf/spark.properties
echo "spark.shell.registrationUrl=http://$KYLO_EDGE_DNS:8400/proxy/v1/spark/shell/register" >> /opt/kylo/kylo-services/conf/spark.properties

18) Update NiFi env to include ENCRYPT_KEY

# Should cause the ENCRYPT_KEY to be set in SparkLauncher(env)
# Should work to get the ENCRYPT_KEY set
echo '' >> /opt/nifi/current/bin/nifi-env.sh
echo 'export ENCRYPT_KEY="$(< /opt/nifi/ext-config/encrypt.key)"' >> /opt/nifi/current/bin/nifi-env.sh

18) Make EFS Dropzone

mkdir /mnt/efs/dropzone
chown kylo:supergroup /mnt/efs/dropzone

19) Restart Services

service nifi restart
kylo-service restart

# Wait for kylo and nifi to come up before continuing

20) Go to Kylo and add Catalog for File Upload

Title: EFS Dropzone
Path: File:/mnt/efs/dropzone

22) Add Data Transform Feed

Name: DT Test
Category: Website

Transformation:
Browse Catalog
EFS Dropzone - File Upload
Add File
demostatsbyzip_s.xml (Attached)
Next
# Wait for spark to digest the xml
Add
Next
Flatten Structure
Explode to Rows
Flatten Structure
Save

Target:
Index: _address, _id, _position, _uuid
Next
Select Merge
Save

Additional Options - Continue

Deploy
Don't enable immediately

23) Fix NiFi Templates and Controllers For EMR

NiFi Flow >> reusable_templates >> data-transform >> Execute Script
** Main Args: ${transform_script_file:substringAfterLast('/')}
** Extra Files: ${table_field_policy_json_file},/etc/spark/conf/hive-site.xml,${transform_script_file}
** SparkHome: /usr/lib/spark
** SparkMaster: yarn
** Spark YARN Deploy Mode: cluster
** Extra JARs: /usr/lib/spark/jars/datanucleus-api-jdo-3.2.6.jar,/usr/lib/spark/jars/datanucleus-core-3.2.10.jar,/usr/lib/spark/jars/datanucleus-rdbms-3.2.9.jar

NiFi Flow >> reusable_templates >> data-transform >> Register Tables
** Hive Thrift Services
*** jdbc:hive2://<EMR_MASTER_PRIVATE_DNS>:10000/default

24) In Kylo, Start the Feed
# The feed will fail in NiFi at: NiFi Flow >> reusable_templates >> data-transform >> Execute Script

25) View the error on the spark jobs page 

http://<MASTER_NODE_PUBLIC_DNS>:8088/cluster

The error will be the same "No encryption/decryption key has been configured - please see configuration documentation".




# This is occurring because 

SparkLauncher launcher = new SparkLauncher(env)

In ExecuteSparkJob.java on line 518 is not successfully setting the spark environment with `env` when using AWS EMR.

This seems to be a bug in Kylo.

demostatsbyzip_s.xml

Ben S

unread,
Apr 29, 2019, 5:53:01 AM4/29/19
to Kylo Community

Ben S

unread,
Apr 29, 2019, 11:03:52 AM4/29/19
to Kylo Community
For additional verification I built my own ExecuteSparkJob with additional logging:

...
            addEncryptionSettings(env);

            logger.error("LOGGING ENV");
            env.forEach((key, val) -> logger.error("[" + key + "," + val + "]"));
            
            /* Launch the spark job as a child process */
            SparkLauncher launcher = new SparkLauncher(env)
...

Which outputs:

2019-04-29 14:58:54,347 ERROR [Timer-Driven Process Thread-7] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=d4f7b3f3-eb75-3695-d7f0-1ea88d97b24a] LOGGING ENV
2019-04-29 14:58:54,358 ERROR [Timer-Driven Process Thread-7] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=d4f7b3f3-eb75-3695-d7f0-1ea88d97b24a] [DATASETS,[{"roleMemberships":[],"feedRoleMemberships":[],"id":"da749c24-03f0-458f-93d9-1068888a4b1f","title":"/demostatsbyzip_s.xml","dataSource":{"owner":{"displayName":"Data Lake Administrator","email":null,"enabled":true,"groups":["admin","user"],"systemName":"dladmin"},"allowedActions":{},"roleMemberships":[],"feedRoleMemberships":[],"connector":{"allowedActions":{},"roleMemberships":[],"feedRoleMemberships":[],"id":"b4d19895-027d-4888-ad52-f89324f383b9","pluginId":"local-file-system","title":"Local File System","template":{"files":[],"format":"","jars":[],"options":{},"paths":[]},"icon":"fas:folder"},"id":"8c5c9ba2-550b-43e8-a3bf-db71e2bb9800","template":{"files":[],"format":"","jars":[],"options":{},"paths":["file:/mnt/efs/dropzone"]},"title":"EFS Dropzone"},"format":"xml","options":{"format":"xml","valueTag":"_VALUE","attributePrefix":"_","rowTag":"response"},"paths":["file:/mnt/efs/dropzone/ea7a24d6-00b0-4b5d-8c6b-310337ce7fa5/demostatsbyzip_s.xml"]}]]
2019-04-29 14:58:54,358 ERROR [Timer-Driven Process Thread-7] c.t.nifi.v2.spark.ExecuteSparkJob ExecuteSparkJob[id=d4f7b3f3-eb75-3695-d7f0-1ea88d97b24a] [ENCRYPT_KEY,<MY ENCRYPT_KEY WAS HERE>]

Which is expected.

Ben S

unread,
Apr 30, 2019, 5:39:02 AM4/30/19
to Kylo Community
I was able to get the ExecuteSparkJob working by building my own from the source code and adding the following:

env.forEach((key, val) -> launcher.setConf("spark.yarn.appMasterEnv." + key, StringEscapeUtils.escapeJava(val)));

Right after SparkLauncher's instantiation.
Reply all
Reply to author
Forward
0 new messages