ERROR - string longer than 2147483647 bytes with GoogleCloudStorageHook

871 views
Skip to first unread message

Sean Davis

unread,
May 22, 2019, 8:23:39 PM5/22/19
to cloud-composer-discuss
I am new to composer, but have been using Airflow locally. I have a DAG that downloads a file, parses it to json, and then uses a GoogleCloudStorageHook to upload back to GCS. The file is large (4+GB). The DAG works on my Mac running Airflow 1.10.3 and python3 using a simple LocalExecutor. However, when I run the same DAG on Composer, I get this stack trace and log. It looks like the `multipart` argument to the GCSHook is no longer used and that the underlying Google Cloud SDK is supposed to intelligently switch to multipart uploads for large files; that may not be happening?

Any suggestions on what to do to troubleshoot?

Thanks.


-------------------------------- Stack Trace and Log ---------------------------------------

2019-05-22 23:59:47,640] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml [2019-05-22 23:59:47,640] {process_sra_from_ncbi.py:57} INFO - Beginning upload to cloud storage: etl/run.json
[2019-05-22 23:59:47,641] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml [2019-05-22 23:59:47,640] {gcp_api_base_hook.py:94} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2019-05-22 23:59:47,718] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml [2019-05-22 23:59:47,717] {discovery.py:272} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/storage/v1/rest
[2019-05-22 23:59:47,790] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml [2019-05-22 23:59:47,790] {_helpers.py:129} WARNING - __init__() takes at most 2 positional arguments (3 given)
[2019-05-23 00:00:00,677] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml [2019-05-23 00:00:00,676] {discovery.py:873} INFO - URL being requested: POST https://www.googleapis.com/upload/storage/v1/b/omicidx-cancerdatasci-org/o?name=etl%2Frun.json&alt=json&uploadType=media
[2019-05-23 00:00:00,688] {models.py:1796} ERROR - string longer than 2147483647 bytes
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models.py", line 1664, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 103, in execut
    return_value = self.execute_callable(
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 108, in execute_callabl
    return self.python_callable(*self.op_args, **self.op_kwargs
  File "/home/airflow/gcs/dags/process_sra_from_ncbi.py", line 62, in execut
    mime_type='application/json
  File "/usr/local/lib/airflow/airflow/contrib/hooks/gcs_hook.py", line 239, in uploa
    .execute(num_retries=num_retries
  File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrappe
    return wrapped(*args, **kwargs
  File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 846, in execut
    method=str(self.method), body=self.body, headers=self.headers
  File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 164, in _retry_reques
    resp, content = http.request(uri, method, *args, **kwargs
  File "/opt/python3.6/lib/python3.6/site-packages/google_auth_httplib2.py", line 198, in reques
    uri, method, body=body, headers=request_headers, **kwargs
  File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 155, in new_reques
    redirections, connection_type
  File "/opt/python3.6/lib/python3.6/site-packages/httplib2/__init__.py", line 1924, in reques
    cachekey
  File "/opt/python3.6/lib/python3.6/site-packages/httplib2/__init__.py", line 1595, in _reques
    conn, request_uri, method, body, header
  File "/opt/python3.6/lib/python3.6/site-packages/httplib2/__init__.py", line 1502, in _conn_reques
    conn.request(method, request_uri, body, headers
  File "/opt/python3.6/lib/python3.6/http/client.py", line 1239, in reques
    self._send_request(method, url, body, headers, encode_chunked
  File "/opt/python3.6/lib/python3.6/http/client.py", line 1285, in _send_reques
    self.endheaders(body, encode_chunked=encode_chunked
  File "/opt/python3.6/lib/python3.6/http/client.py", line 1234, in endheader
    self._send_output(message_body, encode_chunked=encode_chunked
  File "/opt/python3.6/lib/python3.6/http/client.py", line 1065, in _send_outpu
    self.send(chunk
  File "/opt/python3.6/lib/python3.6/http/client.py", line 986, in sen
    self.sock.sendall(data
  File "/opt/python3.6/lib/python3.6/ssl.py", line 975, in sendal
    v = self.send(byte_view[count:]
  File "/opt/python3.6/lib/python3.6/ssl.py", line 944, in sen
    return self._sslobj.write(data
  File "/opt/python3.6/lib/python3.6/ssl.py", line 642, in writ
    return self._sslobj.write(data
OverflowError: string longer than 2147483647 byte
[2019-05-23 00:00:00,832] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml [2019-05-23 00:00:00,688] {models.py:1796} ERROR - string longer than 2147483647 bytes
[2019-05-23 00:00:00,836] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml Traceback (most recent call last):
[2019-05-23 00:00:00,836] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/usr/local/lib/airflow/airflow/models.py", line 1664, in _run_raw_task
[2019-05-23 00:00:00,837] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     result = task_copy.execute(context=context)
[2019-05-23 00:00:00,837] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 103, in execute
[2019-05-23 00:00:00,837] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     return_value = self.execute_callable()
[2019-05-23 00:00:00,837] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 108, in execute_callable
[2019-05-23 00:00:00,838] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     return self.python_callable(*self.op_args, **self.op_kwargs)
[2019-05-23 00:00:00,839] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/home/airflow/gcs/dags/process_sra_from_ncbi.py", line 62, in execute
[2019-05-23 00:00:00,839] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     mime_type='application/json'
[2019-05-23 00:00:00,839] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/usr/local/lib/airflow/airflow/contrib/hooks/gcs_hook.py", line 239, in upload
[2019-05-23 00:00:00,840] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     .execute(num_retries=num_retries)
[2019-05-23 00:00:00,840] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrapper
[2019-05-23 00:00:00,840] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     return wrapped(*args, **kwargs)
[2019-05-23 00:00:00,840] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 846, in execute
[2019-05-23 00:00:00,840] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     method=str(self.method), body=self.body, headers=self.headers)
[2019-05-23 00:00:00,841] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 164, in _retry_request
[2019-05-23 00:00:00,842] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     resp, content = http.request(uri, method, *args, **kwargs)
[2019-05-23 00:00:00,842] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/site-packages/google_auth_httplib2.py", line 198, in request
[2019-05-23 00:00:00,843] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     uri, method, body=body, headers=request_headers, **kwargs)
[2019-05-23 00:00:00,843] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 155, in new_request
[2019-05-23 00:00:00,843] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     redirections, connection_type)
[2019-05-23 00:00:00,844] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/site-packages/httplib2/__init__.py", line 1924, in request
[2019-05-23 00:00:00,844] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     cachekey,
[2019-05-23 00:00:00,844] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/site-packages/httplib2/__init__.py", line 1595, in _request
[2019-05-23 00:00:00,844] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     conn, request_uri, method, body, headers
[2019-05-23 00:00:00,845] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/site-packages/httplib2/__init__.py", line 1502, in _conn_request
[2019-05-23 00:00:00,845] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     conn.request(method, request_uri, body, headers)
[2019-05-23 00:00:00,845] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/http/client.py", line 1239, in request
[2019-05-23 00:00:00,846] {models.py:1827} INFO - Marking task as FAILED.
[2019-05-23 00:00:00,847] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     self._send_request(method, url, body, headers, encode_chunked)
[2019-05-23 00:00:00,847] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/http/client.py", line 1285, in _send_request
[2019-05-23 00:00:00,847] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     self.endheaders(body, encode_chunked=encode_chunked)
[2019-05-23 00:00:00,848] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/http/client.py", line 1234, in endheaders
[2019-05-23 00:00:00,848] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     self._send_output(message_body, encode_chunked=encode_chunked)
[2019-05-23 00:00:00,848] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/http/client.py", line 1065, in _send_output
[2019-05-23 00:00:00,849] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     self.send(chunk)
[2019-05-23 00:00:00,849] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/http/client.py", line 986, in send
[2019-05-23 00:00:00,850] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     self.sock.sendall(data)
[2019-05-23 00:00:00,850] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/ssl.py", line 975, in sendall
[2019-05-23 00:00:00,850] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     v = self.send(byte_view[count:])
[2019-05-23 00:00:00,851] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/ssl.py", line 944, in send
[2019-05-23 00:00:00,852] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     return self._sslobj.write(data)
[2019-05-23 00:00:00,852] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/ssl.py", line 642, in write
[2019-05-23 00:00:00,856] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     return self._sslobj.write(data)
[2019-05-23 00:00:00,857] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml OverflowError: string longer than 2147483647 bytes
[2019-05-23 00:00:00,857] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml [2019-05-23 00:00:00,846] {models.py:1827} INFO - Marking task as FAILED.
[2019-05-23 00:00:00,938] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml Traceback (most recent call last):
[2019-05-23 00:00:00,943] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/usr/local/bin/airflow", line 7, in <module>
[2019-05-23 00:00:00,950] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     exec(compile(f.read(), __file__, 'exec'))
[2019-05-23 00:00:00,954] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/usr/local/lib/airflow/airflow/bin/airflow", line 32, in <module>
[2019-05-23 00:00:00,956] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     args.func(args)
[2019-05-23 00:00:00,956] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/usr/local/lib/airflow/airflow/utils/cli.py", line 74, in wrapper
[2019-05-23 00:00:00,957] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     return f(*args, **kwargs)
[2019-05-23 00:00:00,957] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/usr/local/lib/airflow/airflow/bin/cli.py", line 526, in run
[2019-05-23 00:00:00,958] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     _run(args, dag, ti)
[2019-05-23 00:00:00,961] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/usr/local/lib/airflow/airflow/bin/cli.py", line 445, in _run
[2019-05-23 00:00:00,961] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     pool=args.pool,
[2019-05-23 00:00:00,962] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/usr/local/lib/airflow/airflow/utils/db.py", line 73, in wrapper
[2019-05-23 00:00:00,962] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     return func(*args, **kwargs)
[2019-05-23 00:00:00,963] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/usr/local/lib/airflow/airflow/models.py", line 1664, in _run_raw_task
[2019-05-23 00:00:00,964] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     result = task_copy.execute(context=context)
[2019-05-23 00:00:00,964] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 103, in execute
[2019-05-23 00:00:00,964] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     return_value = self.execute_callable()
[2019-05-23 00:00:00,964] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 108, in execute_callable
[2019-05-23 00:00:00,964] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     return self.python_callable(*self.op_args, **self.op_kwargs)
[2019-05-23 00:00:00,964] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/home/airflow/gcs/dags/process_sra_from_ncbi.py", line 62, in execute
[2019-05-23 00:00:00,965] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     mime_type='application/json'
[2019-05-23 00:00:00,965] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/usr/local/lib/airflow/airflow/contrib/hooks/gcs_hook.py", line 239, in upload
[2019-05-23 00:00:00,965] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     .execute(num_retries=num_retries)
[2019-05-23 00:00:00,965] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrapper
[2019-05-23 00:00:00,965] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     return wrapped(*args, **kwargs)
[2019-05-23 00:00:00,965] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 846, in execute
[2019-05-23 00:00:00,965] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     method=str(self.method), body=self.body, headers=self.headers)
[2019-05-23 00:00:00,966] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 164, in _retry_request
[2019-05-23 00:00:00,966] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     resp, content = http.request(uri, method, *args, **kwargs)
[2019-05-23 00:00:00,966] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/site-packages/google_auth_httplib2.py", line 198, in request
[2019-05-23 00:00:00,966] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     uri, method, body=body, headers=request_headers, **kwargs)
[2019-05-23 00:00:00,966] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 155, in new_request
[2019-05-23 00:00:00,966] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     redirections, connection_type)
[2019-05-23 00:00:00,966] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/site-packages/httplib2/__init__.py", line 1924, in request
[2019-05-23 00:00:00,967] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     cachekey,
[2019-05-23 00:00:00,967] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/site-packages/httplib2/__init__.py", line 1595, in _request
[2019-05-23 00:00:00,967] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     conn, request_uri, method, body, headers
[2019-05-23 00:00:00,967] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/site-packages/httplib2/__init__.py", line 1502, in _conn_request
[2019-05-23 00:00:00,967] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     conn.request(method, request_uri, body, headers)
[2019-05-23 00:00:00,967] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/http/client.py", line 1239, in request
[2019-05-23 00:00:00,967] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     self._send_request(method, url, body, headers, encode_chunked)
[2019-05-23 00:00:00,970] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/http/client.py", line 1285, in _send_request
[2019-05-23 00:00:01,355] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     self.endheaders(body, encode_chunked=encode_chunked)
[2019-05-23 00:00:01,355] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/http/client.py", line 1234, in endheaders
[2019-05-23 00:00:01,355] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     self._send_output(message_body, encode_chunked=encode_chunked)
[2019-05-23 00:00:01,356] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/http/client.py", line 1065, in _send_output
[2019-05-23 00:00:01,356] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     self.send(chunk)
[2019-05-23 00:00:01,356] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/http/client.py", line 986, in send
[2019-05-23 00:00:01,356] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     self.sock.sendall(data)
[2019-05-23 00:00:01,356] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/ssl.py", line 975, in sendall
[2019-05-23 00:00:01,357] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     v = self.send(byte_view[count:])
[2019-05-23 00:00:01,357] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/ssl.py", line 944, in send
[2019-05-23 00:00:01,357] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     return self._sslobj.write(data)
[2019-05-23 00:00:01,357] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml   File "/opt/python3.6/lib/python3.6/ssl.py", line 642, in write
[2019-05-23 00:00:01,357] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml     return self._sslobj.write(data)
[2019-05-23 00:00:01,357] {base_task_runner.py:101} INFO - Job 65: Subtask process_sra_run_xml OverflowError: string longer than 2147483647 bytes
[2019-05-23 00:00:05,076] {helpers.py:250} INFO - Sending Signals.SIGTERM to GPID 1290
[2019-05-23 00:00:05,651] {helpers.py:232} INFO - Process psutil.Process(pid=1290, status='terminated') (1290) terminated with exit code -15

Szasza Palmer

unread,
Jun 16, 2019, 8:43:19 PM6/16/19
to cloud-composer-discuss
Hi Sean,

I am unsure which Cloud Composer & Airflow version do you use but the latest version in Cloud Composer as of now is Airflow 1.10.2 which requires multipart to be set to True explicitly in case of large uploads: https://github.com/apache/airflow/blob/1.10.2/airflow/contrib/hooks/gcs_hook.py#L214

I hope this helps.

Sergei Guschin

unread,
Aug 9, 2019, 1:39:43 PM8/9/19
to cloud-composer-discuss
I am sporadically getting same error while doing multipart download (implemented as hook extension with code below error) from GCS being on composer-1.7.2-airflow-1.10.2:


[2019-08-09 15:23:33,955] {base_task_runner.py:101} INFO - Job 75403: Subtask parse_file [2019-08-09 15:23:33,955] {discovery.py:873} INFO - URL being requested: GET https://www.googleapis.com/storage/v1/b/my_bucket_name/o/file_1gb_size.tar.gz%2Ffile_1gb_size.tar.gz?alt=media
[2019-08-09 15:23:33,956] {base_task_runner.py:101} INFO - Job 75403: Subtask parse_file [2019-08-09 15:23:33,956] {gcs_hook_extension.py:40} INFO - Using GCS Hook Extension with chunk size 1073741824
[2019-08-09 15:23:33,956] {base_task_runner.py:101} INFO - Job 75403: Subtask parse_file [2019-08-09 15:23:33,956] {_helpers.py:129} WARNING - __init__() takes at most 3 positional arguments (4 given)
[2019-08-09 15:23:41,230] {helpers.py:250} INFO - Sending Signals.SIGTERM to GPID 113245
[2019-08-09 15:23:41,230] {models.py:1641} ERROR - Received SIGTERM. Terminating subprocesses.
[2019-08-09 15:23:41,230] {base_task_runner.py:101} INFO - Job 75403: Subtask parse_file [2019-08-09 15:23:41,230] {models.py:1641} ERROR - Received SIGTERM. Terminating subprocesses.
[2019-08-09 15:23:41,257] {models.py:1796} ERROR - Task received SIGTERM signal
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models.py", line 1664, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/home/airflow/gcs/plugins/operators/gcs_dlfpu.py", line 111, in execut
    self._download_files(filtered_files, input_directory
  File "/home/airflow/gcs/plugins/operators/gcs_dlfpu.py", line 195, in _download_file
    chunksize=1024 * 1024 * 1024
  File "/home/airflow/gcs/plugins/hooks/gcs_hook_extension.py", line 48, in download_in_chunk
    status, done = downloader.next_chunk(
  File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/_helpers.py", line 130, in positional_wrappe
    return wrapped(*args, **kwargs
  File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 686, in next_chun
    'GET', headers=headers
  File "/opt/python3.6/lib/python3.6/site-packages/googleapiclient/http.py", line 164, in _retry_reques
    resp, content = http.request(uri, method, *args, **kwargs
  File "/opt/python3.6/lib/python3.6/site-packages/google_auth_httplib2.py", line 198, in reques
    uri, method, body=body, headers=request_headers, **kwargs
  File "/usr/local/lib/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 155, in new_reques
    redirections, connection_type
  File "/opt/python3.6/lib/python3.6/site-packages/httplib2/__init__.py", line 1924, in reques
    cachekey
  File "/opt/python3.6/lib/python3.6/site-packages/httplib2/__init__.py", line 1595, in _reques
    conn, request_uri, method, body, header
  File "/opt/python3.6/lib/python3.6/site-packages/httplib2/__init__.py", line 1562, in _conn_reques
    content = response.read(
  File "/opt/python3.6/lib/python3.6/http/client.py", line 462, in rea
    s = self._safe_read(self.length
  File "/opt/python3.6/lib/python3.6/http/client.py", line 612, in _safe_rea
    chunk = self.fp.read(min(amt, MAXAMOUNT)
  File "/opt/python3.6/lib/python3.6/socket.py", line 586, in readint
    return self._sock.recv_into(b
  File "/opt/python3.6/lib/python3.6/ssl.py", line 1012, in recv_int
    return self.read(nbytes, buffer
  File "/opt/python3.6/lib/python3.6/ssl.py", line 874, in rea
    return self._sslobj.read(len, buffer
  File "/opt/python3.6/lib/python3.6/ssl.py", line 631, in rea
    v = self._sslobj.read(len, buffer
  File "/usr/local/lib/airflow/airflow/models.py", line 1643, in signal_handle
    raise AirflowException("Task received SIGTERM signal"
airflow.exceptions.AirflowException: Task received SIGTERM signa



multipart download code:
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from googleapiclient.http import MediaIoBaseDownload, DEFAULT_CHUNK_SIZE

class GoogleCloudStorageHookExtended(GoogleCloudStorageHook):
"""
Interact with Google Cloud Storage. This hook uses the Google Cloud Platform
connection.
"""

def __init__(self, google_cloud_storage_conn_id='google_cloud_default', delegate_to=None):
super(GoogleCloudStorageHookExtended, self).__init__(google_cloud_storage_conn_id,
delegate_to)

# pylint:disable=redefined-builtin
def download_in_chunks(self, bucket, object, filename, chunksize=None):
"""
Get a file from Google Cloud Storage - download chunk by chunk and don't return bytes to
caller. This allows us to download large files without blowing out local memory.

:param bucket: The bucket to fetch from.
:type bucket: string
:param object: The object to fetch.
:type object: string
:param filename: If set, a local file path where the file should be written to.
:type filename: string
"""
service = self.get_conn()
request = service \
.objects() \
.get_media(bucket=bucket, object=object)

if chunksize is None:
chunksize = DEFAULT_CHUNK_SIZE

self.log.info("Using GCS Hook Extension with chunk size {}".format(chunksize))

# Write the file to local file path, if requested.
with open(filename, 'wb') as file_fd:
downloader = MediaIoBaseDownload(file_fd, request, chunksize)

done = False
while done is False:
status, done = downloader.next_chunk()


Wilson Lian

unread,
Aug 12, 2019, 7:23:16 PM8/12/19
to Sergei Guschin, cloud-composer-discuss
Hi Sergei,

Looks like you've got a custom hook implementation as a plugin, and that's throwing the error. Do you get an error using the built-in GCSHook in Airflow 1.10.2?

--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-composer-discuss/51942330-4af6-4ed4-a38f-adbcbf7dcc7c%40googlegroups.com.

Sergei Guschin

unread,
Aug 13, 2019, 8:38:11 AM8/13/19
to cloud-composer-discuss
hi Wilson, 

Built in hook does not support download in chunks.

Sergei
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-discuss+unsub...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages