Triggering dags via Python Cloud Functions (triggered via PubSub)

3,166 views
Skip to first unread message

rmur...@merkleinc.com

unread,
Sep 7, 2018, 10:48:26 AM9/7/18
to cloud-composer-discuss
Anyone out there triggering dags via python Cloud Functions?  If so, can you share your code snippets?

Why?
Because when I use the node.js example code here: https://cloud.google.com/composer/docs/how-to/using/triggering-with-gcf
it works most times, but I need to better capture errors and I am node.js illiterate (note:  the composer API is getting some errors when you drop a couple dozen messages in the pubSub topic quickly, something about "execution_date collision").

I've modeled a function off the following code, but when I trigger it via pubSub/CloudFunctions it crashes with 400 response code errors.  Code below (without our specific env values) if anyone wants to take a look.

import base64
import logging
import json
import google.auth
import google.auth.app_engine
import google.auth.compute_engine.credentials
import google.auth.iam
from google.auth.transport.requests import Request
import google.oauth2.credentials
import google.oauth2.service_account
import requests
import requests_toolbelt.adapters.appengine



WEBSERVER_ID = 'YYYYYYYYYYYYYYYYYYYY'
DAG_NAME = 'zzzzzzzzzzzzzzzzzz'
WEBSERVER_URL = 'https://' + WEBSERVER_ID + '.appspot.com/api/experimental/dags/' + DAG_NAME + '/dag_runs'

# note - I tried GET and POST
def make_iap_request(url, client_id, method='GET', **kwargs):
    bootstrap_credentials, _ = google.auth.default(scopes=[IAM_SCOPE])

    if isinstance(bootstrap_credentials, google.oauth2.credentials.Credentials):
        raise Exception('make_iap_request is only supported for service accounts.')
    elif isinstance(bootstrap_credentials, google.auth.app_engine.Credentials):
        requests_toolbelt.adapters.appengine.monkeypatch()

    # For service account's using the Compute Engine metadata service,
    # service_account_email isn't available until refresh is called.
    bootstrap_credentials.refresh(Request())

    signer = None
    signer_email = bootstrap_credentials.service_account_email
    logging.info("signer_email: " + str(signer_email))

    if isinstance(bootstrap_credentials,google.auth.compute_engine.credentials.Credentials):
        signer = google.auth.iam.Signer(Request(), bootstrap_credentials, signer_email)
    else:
        signer = bootstrap_credentials.signer

    logging.info("signer: " + str(signer))

    service_account_credentials = google.oauth2.service_account.Credentials(signer, signer_email, token_uri=OAUTH_TOKEN_URI, additional_claims={'target_audience': client_id})
    logging.info("service_account_credentials: " + str(service_account_credentials))

    google_open_id_connect_token = get_google_open_id_connect_token(service_account_credentials)
    logging.info("google_open_id_connect_token: " + str(google_open_id_connect_token))

    resp = requests.request(method, url, headers={'Authorization': 'Bearer {}'.format(google_open_id_connect_token)}, **kwargs)

    logging.info("resp.status_code: " + str(resp.status_code))
    logging.info("resp.headers: " + str(resp.headers))
    logging.info("resp.text: " + str(resp.text))

    if resp.status_code == 403:
        raise Exception('Service account {} does not have permission to access the IAP-protected application.'.format(signer_email))
    elif resp.status_code != 200:
        raise Exception('Bad response from application: {!r} / {!r} / {!r}'.format(resp.status_code, resp.headers, resp.text))
    else:
        return resp.text


def get_google_open_id_connect_token(service_account_credentials):

    service_account_jwt = (service_account_credentials._make_authorization_grant_assertion())

    request = google.auth.transport.requests.Request()
    body = {
        'assertion': service_account_jwt,
        'grant_type': google.oauth2._client._JWT_GRANT_TYPE,
    }
    token_response = google.oauth2._client._token_endpoint_request(request, OAUTH_TOKEN_URI, body)
    return token_response['id_token']


def hello_pubsub(event, context):
    pubsub_message = base64.b64decode(event['data']).decode('utf-8')
    pubsub_message = pubsub_message.replace('\n', ' ')
    my_data = json.loads(pubsub_message)
    output = make_iap_request(WEBSERVER_URL, CLIENT_ID, 'POST', params=my_data)
    logging.info("output is: " + str(output))
    
    

example message content, note the \n in the content which is removed in the function:
{
"var1": "Rich",
"var2": "Richard"
}



Crash details:

Traceback (most recent call last): 
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 281, 
in run_background_function _function_handler.invoke_user_function(event_object) 
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 171, 
in invoke_user_function return call_user_function(request_or_event) 
File "/env/local/lib/python3.7/site-packages/google/cloud/functions_v1beta2/worker.py", line 168, 
in call_user_function event_context.Context(**request_or_event.context)) File "/user_code/main.py", line 149, 
in hello_pubsub output = make_iap_request(WEBSERVER_URL, CLIENT_ID, 'POST', params=my_data) 
File "/user_code/main.py", line 96, 
in make_iap_request raise Exception('Bad response from application: 
{!r} / {!r} / {!r}'.format(resp.status_code, resp.headers, resp.text)) 
Exception: Bad response from application: 400 / {'Date': 'Fri, 07 Sep 2018 13:54:55 GMT', 
'Content-Type': 'text/html', 'Content-Length': '192', 'Server': 'gunicorn/19.8.1', 'Via': 
'1.1 google', 'Alt-Svc': 'quic=":443"; ma=2592000; v="44,43,39,35"'} / 
'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">\n<title>400 Bad Request</title>\n
<h1>Bad Request</h1>\n
<p>The browser (or proxy) sent a request that this server could not understand.</p>\n'



Alternative?  If someone out there is a node.js expert and can tweak the original javascript to basically not acknowlege the pubSub message if the composer response code is not 200, that would be great too.


rich.m...@cloudtp.com

unread,
Sep 7, 2018, 10:54:20 AM9/7/18
to cloud-composer-discuss
I just completed doing this... altered the sample to add error handling (and logging for visibility - especially in auth failures). I'll post the code here for you in a bit.  It can be deployed to trigger from GCS, Pub/Sub or Http..

It is a roughish working version.

Tim Swast

unread,
Sep 7, 2018, 12:11:25 PM9/7/18
to rich.m...@cloudtp.com, cloud-composer-discuss
@ rmurnane : The Python version is great! Would you mind sending a pull request to https://github.com/GoogleCloudPlatform/python-docs-samples (directory functions/composer-storage-trigger/) with the Python version? If you do, I can include it in the docs. Tag me @tswast in the pull request description, and I can get it merged.

@ rich.mcaneny Likewise, if you could send a pull request to https://github.com/GoogleCloudPlatform/nodejs-docs-samples/tree/master/functions/composer-storage-trigger with your improvements to the Node.js sample?

The information contained in this e-mail message may be privileged and confidential information and is intended only for the use of the individual and/or entity identified in the alias address of this message. If the reader of this message is not the intended recipient, or an employee or agent responsible to deliver it to the intended recipient, you are hereby notified that you have received this in error and that any review, dissemination, distribution, or copying of this message is strictly prohibited. If you have received this message in error, please notify us immediately by telephone or return e-mail and delete the original message from your system.

--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.
To post to this group, send email to cloud-compo...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-composer-discuss/d4314204-cd84-4557-b64b-f844d7565e3f%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
  •  Tim Swast
  •  Software Friendliness Engineer
  •  Google Cloud Developer Relations
  •  Seattle, WA, USA

rich.m...@cloudtp.com

unread,
Sep 7, 2018, 1:59:30 PM9/7/18
to cloud-composer-discuss
I will try to do it this weekend.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-discuss+unsub...@googlegroups.com.
Message has been deleted

francesc...@weightwatchers.com

unread,
Sep 27, 2018, 1:21:55 PM9/27/18
to cloud-composer-discuss
Anyone have luck with this in Python. I am trying to do this and have the same code.
I can successfully call GETs but I cannot POST to trigger a dag.

 def trigger_dag():

    webserver_id = "ZZZZZZZZ"
    dag_name = "trigger_dag_ex"
    task_name = "echo"

    ### DOES NOT WORK. I get 400 : The browser (or proxy) sent a request that this server could not understand.
    webserver_url = "https://{0}-tp.appspot.com/api/experimental/dags/{1}/dag_runs".format(webserver_id, dag_name)
    method="POST"
    ####

    ### The following however work and I get 200s
    #webserver_url = "https://{0}-tp.appspot.com/api/experimental/dags/{1}/tasks/{2}".format(webserver_id, dag_name,task_name)
    #webserver_url = "https://{0}-tp.appspot.com/api/experimental/pools".format(webserver_id)
    #method="GET"
    ######

    return make_iap_request(webserver_url, client_id, method=method)

On Friday, September 7, 2018 at 10:48:26 AM UTC-4, rmur...@merkleinc.com wrote:

Rich Murnane

unread,
Sep 27, 2018, 2:14:17 PM9/27/18
to francesc...@weightwatchers.com, cloud-composer-discuss

Hi Francesco,

 

I kind of gave up, changed direction and we’re now using the attached in our app engine code to invoke the dags, which is working OK. 

Maybe you can simply get this working with cloud functions (python), we decided removing one piece of the puzzle made good sense for us.

 

This is probably too much info, but…

 

Our process was:

file lands, pubSub message, app engine does stuff like logging/emails /etc., then places on different pubSub topic, cloud functions see msg, trigger dag

 

now it’s:

file lands, pubSub message, app engine does stuff like logging/emails /etc.., then triggers dag

 

 

snippet to run the code is below.

 

 

import composerTools

 

client_id="xxxxxxxxxx.apps.googleusercontent.com"

webserver_id="xxxxxxxxxxxxxxxxxxxxx-tp"

dag_name="msg_to_test"

data_dict=None

 

data_dict = dict()

data_dict['variable1'] = 'value1'

data_dict['variable2'] = 'value2'

data_dict['variable3'] = 'value3'

 

x = composerTools.run_dag(client_id, webserver_id, dag_name, data_dict)

x

 

exit()

--

You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.

To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.


To post to this group, send email to cloud-compo...@googlegroups.com.

composerTools.py.txt

francesc...@weightwatchers.com

unread,
Sep 28, 2018, 2:24:17 PM9/28/18
to cloud-composer-discuss
Thanks for sharing.
I was able to make a successful iap request and trigger a dag with a cloud function.
As long as you give  iam.ServiceAccountActor and iam.ServiceAccountTokenCreator to the {PROJECT_ID}@appspot.gserviceaccount.com email ( the default google app engine SA), your code will work.

Francesco

To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-discuss+unsub...@googlegroups.com.
To post to this group, send email to cloud-comp...@googlegroups.com.

Feng Lu

unread,
Oct 1, 2018, 2:42:30 AM10/1/18
to francesco.perera, cloud-composer-discuss

To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.
To post to this group, send email to cloud-compo...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-composer-discuss/cc32cb5b-2b9e-4318-8ac7-663fc16be4b5%40googlegroups.com.
Message has been deleted

hicham aissaoui

unread,
Sep 25, 2019, 7:42:49 PM9/25/19
to cloud-composer-discuss
Below an example of cloud function that trigger a composer DAG  :
(need to capture errors and optimize the script )


import requests
import time
import json
import base64
from datetime import datetime

PROJECT_ID = "PROJECT_ID"
CLIENT_ID = "CLIENT_ID"
WEBSERVER_ID = "COMPOSER_SERVER_ID"
DAG_NAME = "DAG_NAME"
USER_AGENT = "gcf-event-trigger"
WEBSERVER_URL = "https://{0}.appspot.com/api/experimental/dags/{1}/dag_runs".format(WEBSERVER_ID, DAG_NAME)


def run_dag(context):
id_token = authorize_iap()
results = make_iap_request(id_token)


def make_iap_request(id_token):
dict = {"test":"valueA"}
payload = {
'run_id': 'post-triggered-run-%s' % datetime.now().strftime('%Y%m%d%H%M%s'),
'conf': json.dumps(dict)
}
result = requests.post(WEBSERVER_URL,
headers={"User-Agent": USER_AGENT, "Authorization": "Bearer {0}".format(id_token)},
data=json.dumps(payload))
# handle exception
return result.json()


def authorize_iap():
service_account = PROJECT_ID+"@appspot.gserviceaccount.com"
req = requests.get("http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/{0}/token".format(service_account),
headers={'User-Agent': USER_AGENT, 'Metadata-Flavor': 'Google'}).json()
access_token = req["access_token"]
now = time.time()
claims = {
"iss": service_account,
"aud": "https://www.googleapis.com/oauth2/v4/token",
"iat": now,
"exp": now + 60,
"target_audience": CLIENT_ID
}
jwt_header = base64.b64encode(json.dumps({"alg": 'RS256', "typ": 'JWT'}).encode('utf-8'))
jwt_claimset = base64.b64encode(json.dumps(claims).encode('utf-8'))
toSign = b'.'.join([jwt_header, jwt_claimset])
blob = requests.post("https://iam.googleapis.com/v1/projects/{0}/serviceAccounts/{1}:signBlob"
.format(PROJECT_ID, service_account),
data={"bytesToSign": base64.b64encode(toSign)},
headers={'User-Agent': USER_AGENT,"Authorization": "Bearer {0}".format(access_token)}).json()
print(blob)
jwt_signature = blob["signature"].encode('utf-8')
jwt = b'.'.join([jwt_header, jwt_claimset, jwt_signature])

req_form = {'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', 'assertion': jwt}
token = requests.post('https://www.googleapis.com/oauth2/v4/token', data=req_form).json()
return token["id_token"]

Leah Cole

unread,
Sep 27, 2019, 12:30:34 PM9/27/19
to cloud-composer-discuss
hi! All of these suggestions are awesome, but I wanted to give an update that as of today, we now have a Python sample in the docs!
Reply all
Reply to author
Forward
0 new messages