Getting TimeoutError while scheduling tasks

30 views
Skip to first unread message

Friscian Viales

unread,
Aug 18, 2021, 11:37:03 PM8/18/21
to Luigi
Hi Everyone, 

I have a workflow that has about 16 ExternalTasks that output to a GCS bucket. Some of the tasks are getting errors while being scheduled. It looks like this: 

```WARNING - Will not run SalesforceReplicateObjectToGCS(SOBJECT=AccountHistory) or any dependencies due to error in complete() method:
Traceback (most recent call last):
  File "C:\Users\FriscianViales\AppData\Roaming\Python\Python38\site-packages\luigi\worker.py", line 401, in check_complete
    is_complete = task.complete()
  File "C:\Users\FriscianViales\AppData\Roaming\Python\Python38\site-packages\luigi\task.py", line 571, in complete
    return all(map(lambda output: output.exists(), outputs))
  File "C:\Users\FriscianViales\AppData\Roaming\Python\Python38\site-packages\luigi\task.py", line 571, in <lambda>
    return all(map(lambda output: output.exists(), outputs))
  File "C:\Users\FriscianViales\AppData\Roaming\Python\Python38\site-packages\luigi\target.py", line 251, in exists
    return self.fs.exists(path)
  File "C:\Users\FriscianViales\AppData\Roaming\Python\Python38\site-packages\luigi\contrib\gcs.py", line 196, in exists
    if self._obj_exists(bucket, obj):
  File "C:\Users\FriscianViales\AppData\Roaming\Python\Python38\site-packages\luigi\contrib\gcs.py", line 138, in _obj_exists
    self.client.objects().get(bucket=bucket, object=obj).execute()
  File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\googleapiclient\_helpers.py", line 134, in positional_wrapper
    return wrapped(*args, **kwargs)
  File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\googleapiclient\http.py", line 900, in execute
    resp, content = _retry_request(
  File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\googleapiclient\http.py", line 204, in _retry_request
    raise exception
  File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\googleapiclient\http.py", line 177, in _retry_request
    resp, content = http.request(uri, method, *args, **kwargs)
  File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\google_auth_httplib2.py", line 209, in request
    self.credentials.before_request(self._request, method, uri, request_headers)
  File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\google\auth\credentials.py", line 133, in before_request
    self.refresh(request)
  File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\google\oauth2\service_account.py", line 361, in refresh
    access_token, expiry, _ = _client.jwt_grant(request, self._token_uri, assertion)
  File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\google\oauth2\_client.py", line 153, in jwt_grant
    response_data = _token_endpoint_request(request, token_uri, body)
  File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\google\oauth2\_client.py", line 105, in _token_endpoint_request
    response = request(method="POST", url=token_uri, headers=headers, body=body)
  File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\google_auth_httplib2.py", line 119, in __call__
    response, data = self.http.request(
  File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\httplib2\__init__.py", line 1708, in request
    (response, content) = self._request(
  File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\httplib2\__init__.py", line 1424, in _request
    (response, content) = self._conn_request(conn, request_uri, method, body, headers)
  File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\httplib2\__init__.py", line 1346, in _conn_request
    conn.connect()
  File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\httplib2\__init__.py", line 1182, in connect
    raise socket_err
  File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\httplib2\__init__.py", line 1136, in connect
    sock.connect((self.host, self.port))
TimeoutError: [WinError 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond
```

Is there a way to Auth with GCP only once as opposed to doing it on every single task? I think this is what is doing and therefore some of them failed to get scheduled. \

I am using a service account in an env variable like this: 

# File configurations:
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('GCP_KEYS')

Appreciate some help with this! 

Lars Albertsson

unread,
Aug 19, 2021, 11:55:39 AM8/19/21
to Friscian Viales, Luigi
I recall creating a custom GCSClient that reused a single connection at some point. IIRC, I discarded it after discovering that the GCP python library has internal connection pooling and reuses authenticated connections. It has been a couple of years, however, so I might be wrong.

--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/luigi-user/b8557b60-1725-4d77-8690-11ea6eee1c87n%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages