HopsworksSqoopOperator to move a generic table from MySQL to HDFS

88 views
Skip to first unread message

arse...@gmail.com

unread,
Dec 17, 2019, 10:07:36 AM12/17/19
to Hops
Hi, 

I would like to see an simple DAG example (.py) that use HopsworksSqoopOperator of airflow to move a generic table from MySQL to HDFS.
I know that is similar to SqoopOperator, but i would like to see an example with HopsworksSqoopOperator anyway.

Could you help me?

Thanks,
Antony

Theo Kakantousis

unread,
Dec 17, 2019, 6:03:11 PM12/17/19
to Hops
Hi,

I've attached an example of a sqoop dag reading from a mysql table and writing to a dataset in a project. You need to create the jdbc connection in airflow first.
sqoop_dag.py

arse...@gmail.com

unread,
Dec 18, 2019, 11:15:34 AM12/18/19
to Hops
Hi,
thanks for the reply, really appreciated.

Using BashOperator, like below, it works:

task1 = BashOperator(
    task_id='sqoop_prova_1',
    bash_command='HADOOP_USER_NAME=hdfs /opt/hops/sqoop/bin/sqoop import --connect \'jdbc:mysql://MY_URL:3306/MY_DB\' --username MY_USR --password MY_PSW --table MY_TBL --driver com.mysql.jdbc.Driver --target-dir /Projects/TestP/sqoop-import/MY_TBL -m 1',
    dag=dag)


Using HopsworksSqoopOperator, like you showed to me, it not works:

CONNECTION_ID = "hopsworks_jdbc"
PROJECT_NAME = "TestP"

task1 = HopsworksSqoopOperator(task_id='sqoop_prova_4',
    dag=dag,
    conn_id=CONNECTION_ID,
project_name=PROJECT_NAME,
table='MY_TBL',
target_dir='/Projects/TestP/sqoop-import/MY_TBL',
verbose=False,
cmd_type='import',
driver="com.mysql.jdbc.Driver",
file_type='text'
)


I have this error on hadoop log:

2019-12-18 15:54:24,134 WARN io.hops.transaction.handler.RequestHandler: SET_REPLICATION TX Failed. TX Time: 2 ms, RetryCount: 0, TX Stats -- Setup: 0ms, AcquireLocks: 2ms, InMemoryProcessing: -1ms, CommitTime: -1ms. Locks: INodeLock {paths=[/Projects/TestP/Resources/.mrStaging/TestP__meb10000/.staging/job_1576656122889_0005/job.split], lockType=WRITE_ON_TARGET_AND_PARENT }. java.lang.IllegalArgumentException
java.lang.IllegalArgumentException
        at com.google.common.base.Preconditions.checkArgument(Preconditions.java:77)
        at org.apache.hadoop.hdfs.server.namenode.FSDirectory.getStorageTypeDeltas(FSDirectory.java:578)
        at org.apache.hadoop.hdfs.server.namenode.FSDirectory.updateCount(FSDirectory.java:485)
        at org.apache.hadoop.hdfs.server.namenode.FSDirAttrOp.unprotectedSetReplication(FSDirAttrOp.java:602)
        at org.apache.hadoop.hdfs.server.namenode.FSDirAttrOp$4.performTask(FSDirAttrOp.java:260)
        at io.hops.transaction.handler.TransactionalRequestHandler.execute(TransactionalRequestHandler.java:100)
        at io.hops.transaction.handler.HopsTransactionalRequestHandler.execute(HopsTransactionalRequestHandler.java:50)
        at io.hops.transaction.handler.RequestHandler.handle(RequestHandler.java:68)
        at io.hops.transaction.handler.RequestHandler.handle(RequestHandler.java:63)
        at org.apache.hadoop.hdfs.server.namenode.FSDirAttrOp.setReplication(FSDirAttrOp.java:272)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.setReplication(FSNamesystem.java:1442)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.setReplication(NameNodeRpcServer.java:560)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.setReplication(ClientNamenodeProtocolServerSideTranslatorPB.java:463)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:996)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:850)
        at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:793)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1929)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2786)



Thanks a lot,
Antony

gautier.berthou

unread,
Dec 19, 2019, 4:29:41 AM12/19/19
to Hops
Hi,

It may be a bug on our side. Could you check and tell me the size of the file for which it is trying to change the replication: /Projects/TestP/Resources/.mrStaging/TestP__meb10000/.staging/job_1576656122889_0005/job.split ?

Regards,
Gautier

arse...@gmail.com

unread,
Dec 19, 2019, 11:06:31 AM12/19/19
to Hops
Hi Gautier,
thanks for the reply.


The table of MySQL that I'm tring to import using HopsworksSqoopOperator, is the same that I can successfully import using BashOperator, like showed above. The size of the table is 13 KB.

Anyway, using HopsworksSqoopOperator, from airflow logs, I noticed this exception:

airflow.exceptions.AirflowException: Sqoop command failed: sqoop import -D org.apache.sqoop.splitter.allow_text_splitter=true -D yarn.app.mapreduce.am.staging-dir=/Projects/TestP/Resources/.mrStaging -D yarn.app.mapreduce.client.max-retries=10 --username MY_USR --password MASKED --connect jdbc:mysql://MY_URL:3306/MY_TBL --target-dir /Projects/TestP/sqoop-import/MY_TBL --as-textfile --split-by zip --driver com.mysql.jdbc.Driver --table MY_TBL


So, to understand, I tried to launch the same from command line, like below:

HADOOP_USER_NAME=hdfs /opt/hops/sqoop/bin/sqoop import -D yarn.app.mapreduce.am.staging-dir=/Projects/TestP/Resources/.mrStaging -D yarn.app.mapreduce.client.max-retries=10 --username MY_USR --password MY_PWD --connect jdbc:mysql://MY_URL:3306/MY_TBL --target-dir /Projects/TestP/sqoop-import/MY_TBL --as-textfile --driver com.mysql.jdbc.Driver --table MY_TBL

But it give me the same error reported on the previous post, on hadoop log. 
So I tried to delete the generic argument from sqoop import: "-D yarn.app.mapreduce.am.staging-dir=/Projects/TestP/Resources/.mrStaging", and launch from command line:

HADOOP_USER_NAME=hdfs /opt/hops/sqoop/bin/sqoop import -D yarn.app.mapreduce.client.max-retries=10 --username MY_USR --password MY_PWD --connect jdbc:mysql://MY_URL:3306/MY_TBL --target-dir /Projects/TestP/sqoop-import/MY_TBL --as-textfile --driver com.mysql.jdbc.Driver --table MY_TBL

And it finally works!

Why does HopsworksSqoopOperator create automatically the generic argument of sqoop: "-D yarn.app.mapreduce.am.staging-dir=/Projects/TestP/Resources/.mrStaging"?
And Why it is a problem for us?


Thanks a lot,
Antony

Theo Kakantousis

unread,
Dec 20, 2019, 4:56:59 AM12/20/19
to Hops
Hi,

This property is set by the HopsworksSqoopOperator, otherwise it would default to a global staging directory where project members don't have permission to write to. Also it wouldn't be safe to stage users' files in the same directory. It's part of Hopsworks multi-tenancy.

gautier.berthou

unread,
Dec 20, 2019, 10:59:11 AM12/20/19
to Hops
Hi,

The error you were getting is due to a bug in hopsfs. It has been fixed and pushed to the versions 2.8.2.8 and 2.8.2.9 of hopsfs. So you should not have this problem anymore if you reinstall the namenode.

Best regards,
Gautier

arse...@gmail.com

unread,
Jan 8, 2020, 4:15:58 AM1/8/20
to Hops
Hi,

I'm already using the version 2.8.2.8 of hopfs.
Have I to install the version 2.8.2.9 ?

Best regards,
Antonio

Theo Kakantousis

unread,
Jan 8, 2020, 4:21:15 AM1/8/20
to Hops
You need to re-install Hops 2.8.2.8 manually (or use Karamel for clean installation of Hopsworks 1.0) or install Hopsworks 1.1 which comes with Hops 2.8.2.9.
Reply all
Reply to author
Forward
0 new messages