SFTP connection

1,639 views
Skip to first unread message

Imran Hassanali

unread,
Aug 30, 2018, 6:06:00 PM8/30/18
to cloud-compo...@googlegroups.com
Hello All,

I am attempting to connect to SFTP in airflow.  

I see from the docs, in order to use your id_rsa key in the SSH connection, you need to pass a "key_file" variable as json into the extra options field:

for ex:

{"key_file": "/usr/local/airflow/.ssh/id_rsa"}

Is this possible running airflow on cloud composer?  Is there another way to do this? For example, store the key in kuberneties config within GCP rather than directly on the airflow hosts.  

Thank you,

--
Imran Hassanali | im...@essential.com

Imran Hassanali

unread,
Aug 30, 2018, 9:39:26 PM8/30/18
to cloud-compo...@googlegroups.com
Following up that I was able to figure out how to add the key file, have to specify scope and bucket path as follows:

{"extra__google_cloud_platform__scope":"https://www.googleapis.com/auth/cloud-platform", "extra__google_cloud_platform__project":"YOURPROJECT", "extra__google_cloud_platform__key_path":"/YOURBUCKET/id_rsa","no_host_key_check": "true"}

However I am still unable to connect to any FTP servers.  I tried 2 different server hosts, both which have working connections outside airflow.  One server I used ssh hook method and the other is just a straight username/password ftp server.  Both are giving me the same SSHException: Error reading SSH protocol banner error:

Here is the error when trying to use the following SFTPOperator
sftp = SFTPOperator(
task_id='fetch_data',
ssh_conn_id='fih_dallas_ftp',
local_filepath='/',
remote_filepath='/path/to/file',
operation='GET',
dag=dag
)
[2018-08-31 01:09:52,189] {base_task_runner.py:98} INFO - Subtask: [2018-08-31 01:09:52,189] {ssh_hook.py:159} ERROR - Failed connecting to host: <redacted>, error: No existing session
[2018-08-31 01:09:52,500] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-08-31 01:09:52,502] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/bin/airflow", line 27, in <module>
[2018-08-31 01:09:52,502] {base_task_runner.py:98} INFO - Subtask:     args.func(args)
[2018-08-31 01:09:52,504] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-08-31 01:09:52,504] {base_task_runner.py:98} INFO - Subtask:     pool=args.pool,
[2018-08-31 01:09:52,504] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-08-31 01:09:52,505] {base_task_runner.py:98} INFO - Subtask:     result = func(*args, **kwargs)
[2018-08-31 01:09:52,505] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
[2018-08-31 01:09:52,507] {base_task_runner.py:98} INFO - Subtask:     result = task_copy.execute(context=context)
[2018-08-31 01:09:52,507] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/sftp_operator.py", line 94, in execute
[2018-08-31 01:09:52,533] {base_task_runner.py:98} INFO - Subtask:     .format(file_msg, str(e)))
[2018-08-31 01:09:52,534] {base_task_runner.py:98} INFO - Subtask: airflow.exceptions.AirflowException: Error while transferring None, error: 'NoneType' object has no attribute 'open_sftp'
[2018-08-31 01:09:52,592] {base_task_runner.py:98} INFO - Subtask: [2018-08-31 01:09:52,591] {transport.py:1687} ERROR - Exception: Error reading SSH protocol banner
[2018-08-31 01:09:52,627] {base_task_runner.py:98} INFO - Subtask: [2018-08-31 01:09:52,625] {transport.py:1685} ERROR - Traceback (most recent call last):
[2018-08-31 01:09:52,627] {base_task_runner.py:98} INFO - Subtask: [2018-08-31 01:09:52,625] {transport.py:1685} ERROR -   File "/usr/local/lib/python2.7/site-packages/paramiko/transport.py", line 1893, in run
[2018-08-31 01:09:52,628] {base_task_runner.py:98} INFO - Subtask: [2018-08-31 01:09:52,625] {transport.py:1685} ERROR -     self._check_banner()
[2018-08-31 01:09:52,628] {base_task_runner.py:98} INFO - Subtask: [2018-08-31 01:09:52,625] {transport.py:1685} ERROR -   File "/usr/local/lib/python2.7/site-packages/paramiko/transport.py", line 2049, in _check_banner
[2018-08-31 01:09:52,629] {base_task_runner.py:98} INFO - Subtask: [2018-08-31 01:09:52,625] {transport.py:1685} ERROR -     'Error reading SSH protocol banner' + str(e)
[2018-08-31 01:09:52,631] {base_task_runner.py:98} INFO - Subtask: [2018-08-31 01:09:52,625] {transport.py:1685} ERROR - SSHException: Error reading SSH protocol banner
Here is what I get when trying to use the SSH connection with the ssh key file:
ssh_hook = SSHHook(ssh_conn_id='asurion_ssh')
with ssh_hook.get_conn() as ssh_client:
[2018-08-31 01:17:26,222] {base_task_runner.py:98} INFO - Subtask: [2018-08-31 01:17:26,222] {ssh_hook.py:159} ERROR - Failed connecting to host:
<redacted>
error: No existing session
[2018-08-31 01:17:26,447] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last): [2018-08-31 01:17:26,448] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module> [2018-08-31 01:17:26,448] {base_task_runner.py:98} INFO - Subtask: args.func(args) [2018-08-31 01:17:26,448] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run [2018-08-31 01:17:26,449] {base_task_runner.py:98} INFO - Subtask: pool=args.pool, [2018-08-31 01:17:26,449] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper [2018-08-31 01:17:26,449] {base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs) [2018-08-31 01:17:26,450] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task [2018-08-31 01:17:26,450] {base_task_runner.py:98} INFO - Subtask: result = task_copy.execute(context=context) [2018-08-31 01:17:26,451] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 89, in execute [2018-08-31 01:17:26,451] {base_task_runner.py:98} INFO - Subtask: return_value = self.execute_callable() [2018-08-31 01:17:26,451] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 94, in execute_callable [2018-08-31 01:17:26,452] {base_task_runner.py:98} INFO - Subtask: return self.python_callable(*self.op_args, **self.op_kwargs) [2018-08-31 01:17:26,452] {base_task_runner.py:98} INFO - Subtask: File "/home/airflow/gcs/dags/create_extended_care_records.py", line 187, in FlowOperatorCombineSearchesAndUploadToSFTP [2018-08-31 01:17:26,452] {base_task_runner.py:98} INFO - Subtask: with ssh_hook.get_conn() as ssh_client: [2018-08-31 01:17:26,453] {base_task_runner.py:98} INFO - Subtask: AttributeError: __exit__ [2018-08-31 01:17:26,525] {base_task_runner.py:98} INFO - Subtask: [2018-08-31 01:17:26,523] {transport.py:1687} ERROR - Exception: Error reading SSH protocol banner [2018-08-31 01:17:26,526] {base_task_runner.py:98} INFO - Subtask: [2018-08-31 01:17:26,524] {transport.py:1685} ERROR - Traceback (most recent call last): [2018-08-31 01:17:26,526] {base_task_runner.py:98} INFO - Subtask: [2018-08-31 01:17:26,524] {transport.py:1685} ERROR - File "/usr/local/lib/python2.7/site-packages/paramiko/transport.py", line 1893, in run [2018-08-31 01:17:26,526] {base_task_runner.py:98} INFO - Subtask: [2018-08-31 01:17:26,524] {transport.py:1685} ERROR - self._check_banner() [2018-08-31 01:17:26,527] {base_task_runner.py:98} INFO - Subtask: [2018-08-31 01:17:26,524] {transport.py:1685} ERROR - File "/usr/local/lib/python2.7/site-packages/paramiko/transport.py", line 2049, in _check_banner [2018-08-31 01:17:26,527] {base_task_runner.py:98} INFO - Subtask: [2018-08-31 01:17:26,524] {transport.py:1685} ERROR - 'Error reading SSH protocol banner' + str(e) [2018-08-31 01:17:26,527] {base_task_runner.py:98} INFO - Subtask: [2018-08-31 01:17:26,524] {transport.py:1685} ERROR - SSHException: Error reading SSH protocol banner
Reply all
Reply to author
Forward
0 new messages