The information contained in this e-mail message may be privileged and confidential information and is intended only for the use of the individual and/or entity identified in the alias address of this message. If the reader of this message is not the intended recipient, or an employee or agent responsible to deliver it to the intended recipient, you are hereby notified that you have received this in error and that any review, dissemination, distribution, or copying of this message is strictly prohibited. If you have received this message in error, please notify us immediately by telephone or return e-mail and delete the original message from your system.
--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.
To post to this group, send email to cloud-compo...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-composer-discuss/d4314204-cd84-4557-b64b-f844d7565e3f%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-discuss+unsub...@googlegroups.com.
To post to this group, send email to cloud-compo...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-composer-discuss/d4314204-cd84-4557-b64b-f844d7565e3f%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Hi Francesco,
I kind of gave up, changed direction and we’re now using the attached in our app engine code to invoke the dags, which is working OK.
Maybe you can simply get this working with cloud functions (python), we decided removing one piece of the puzzle made good sense for us.
This is probably too much info, but…
Our process was:
file lands, pubSub message, app engine does stuff like logging/emails /etc., then places on different pubSub topic, cloud functions see msg, trigger dag
now it’s:
file lands, pubSub message, app engine does stuff like logging/emails /etc.., then triggers dag
snippet to run the code is below.
import composerTools
client_id="xxxxxxxxxx.apps.googleusercontent.com"
webserver_id="xxxxxxxxxxxxxxxxxxxxx-tp"
dag_name="msg_to_test"
data_dict=None
data_dict = dict()
data_dict['variable1'] = 'value1'
data_dict['variable2'] = 'value2'
data_dict['variable3'] = 'value3'
x = composerTools.run_dag(client_id, webserver_id, dag_name, data_dict)
x
exit()
--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.
To post to this group, send email to
cloud-compo...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-composer-discuss/3e6fa1e4-fe72-431b-9cd2-3f7c690fbc69%40googlegroups.com.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-discuss+unsub...@googlegroups.com.
To post to this group, send email to cloud-comp...@googlegroups.com.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.
To post to this group, send email to cloud-comp...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-composer-discuss/3e6fa1e4-fe72-431b-9cd2-3f7c690fbc69%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.
To post to this group, send email to cloud-compo...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-composer-discuss/cc32cb5b-2b9e-4318-8ac7-663fc16be4b5%40googlegroups.com.
Below an example of cloud function that trigger a composer DAG :
(need to capture errors and optimize the script )
import requests
import time
import json
import base64
from datetime import datetime
PROJECT_ID = "PROJECT_ID"
CLIENT_ID = "CLIENT_ID"
WEBSERVER_ID = "COMPOSER_SERVER_ID"
DAG_NAME = "DAG_NAME"
USER_AGENT = "gcf-event-trigger"
WEBSERVER_URL = "https://{0}.appspot.com/api/experimental/dags/{1}/dag_runs".format(WEBSERVER_ID, DAG_NAME)
def run_dag(context):
id_token = authorize_iap()
results = make_iap_request(id_token)
def make_iap_request(id_token):
dict = {"test":"valueA"}
payload = {
'run_id': 'post-triggered-run-%s' % datetime.now().strftime('%Y%m%d%H%M%s'),
'conf': json.dumps(dict)
}
result = requests.post(WEBSERVER_URL,
headers={"User-Agent": USER_AGENT, "Authorization": "Bearer {0}".format(id_token)},
data=json.dumps(payload))
# handle exception
return result.json()
def authorize_iap():
service_account = PROJECT_ID+"@appspot.gserviceaccount.com"
req = requests.get("http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/{0}/token".format(service_account),
headers={'User-Agent': USER_AGENT, 'Metadata-Flavor': 'Google'}).json()
access_token = req["access_token"]
now = time.time()
claims = {
"iss": service_account,
"aud": "https://www.googleapis.com/oauth2/v4/token",
"iat": now,
"exp": now + 60,
"target_audience": CLIENT_ID
}
jwt_header = base64.b64encode(json.dumps({"alg": 'RS256', "typ": 'JWT'}).encode('utf-8'))
jwt_claimset = base64.b64encode(json.dumps(claims).encode('utf-8'))
toSign = b'.'.join([jwt_header, jwt_claimset])
blob = requests.post("https://iam.googleapis.com/v1/projects/{0}/serviceAccounts/{1}:signBlob"
.format(PROJECT_ID, service_account),
data={"bytesToSign": base64.b64encode(toSign)},
headers={'User-Agent': USER_AGENT,"Authorization": "Bearer {0}".format(access_token)}).json()
print(blob)
jwt_signature = blob["signature"].encode('utf-8')
jwt = b'.'.join([jwt_header, jwt_claimset, jwt_signature])
req_form = {'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', 'assertion': jwt}
token = requests.post('https://www.googleapis.com/oauth2/v4/token', data=req_form).json()
return token["id_token"]