Hi Everyone,
I have a workflow that has about 16 ExternalTasks that output to a GCS bucket. Some of the tasks are getting errors while being scheduled. It looks like this:
```WARNING - Will not run SalesforceReplicateObjectToGCS(SOBJECT=AccountHistory) or any dependencies due to error in complete() method:
Traceback (most recent call last):
File "C:\Users\FriscianViales\AppData\Roaming\Python\Python38\site-packages\luigi\worker.py", line 401, in check_complete
is_complete = task.complete()
File "C:\Users\FriscianViales\AppData\Roaming\Python\Python38\site-packages\luigi\task.py", line 571, in complete
return all(map(lambda output: output.exists(), outputs))
File "C:\Users\FriscianViales\AppData\Roaming\Python\Python38\site-packages\luigi\task.py", line 571, in <lambda>
return all(map(lambda output: output.exists(), outputs))
File "C:\Users\FriscianViales\AppData\Roaming\Python\Python38\site-packages\luigi\target.py", line 251, in exists
return self.fs.exists(path)
File "C:\Users\FriscianViales\AppData\Roaming\Python\Python38\site-packages\luigi\contrib\gcs.py", line 196, in exists
if self._obj_exists(bucket, obj):
File "C:\Users\FriscianViales\AppData\Roaming\Python\Python38\site-packages\luigi\contrib\gcs.py", line 138, in _obj_exists
self.client.objects().get(bucket=bucket, object=obj).execute()
File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\googleapiclient\_helpers.py", line 134, in positional_wrapper
return wrapped(*args, **kwargs)
File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\googleapiclient\http.py", line 900, in execute
resp, content = _retry_request(
File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\googleapiclient\http.py", line 204, in _retry_request
raise exception
File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\googleapiclient\http.py", line 177, in _retry_request
resp, content = http.request(uri, method, *args, **kwargs)
File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\google_auth_httplib2.py", line 209, in request
self.credentials.before_request(self._request, method, uri, request_headers)
File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\google\auth\credentials.py", line 133, in before_request
self.refresh(request)
File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\google\oauth2\service_account.py", line 361, in refresh
access_token, expiry, _ = _client.jwt_grant(request, self._token_uri, assertion)
File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\google\oauth2\_client.py", line 153, in jwt_grant
response_data = _token_endpoint_request(request, token_uri, body)
File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\google\oauth2\_client.py", line 105, in _token_endpoint_request
response = request(method="POST", url=token_uri, headers=headers, body=body)
File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\google_auth_httplib2.py", line 119, in __call__
response, data = self.http.request(
File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\httplib2\__init__.py", line 1708, in request
(response, content) = self._request(
File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\httplib2\__init__.py", line 1424, in _request
(response, content) = self._conn_request(conn, request_uri, method, body, headers)
File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\httplib2\__init__.py", line 1346, in _conn_request
conn.connect()
File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\httplib2\__init__.py", line 1182, in connect
raise socket_err
File "C:\Users\FriscianViales\anaconda3\envs\Luigi-SalesforcePipelines\lib\site-packages\httplib2\__init__.py", line 1136, in connect
sock.connect((self.host, self.port))
TimeoutError: [WinError 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond
```
Is there a way to Auth with GCP only once as opposed to doing it on every single task? I think this is what is doing and therefore some of them failed to get scheduled. \
I am using a service account in an env variable like this:
# File configurations:
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = config.get('GCP_KEYS')
Appreciate some help with this!