Instantiate Dataproc Workflow Template based on log messages

305 views
Skip to first unread message

Deena Dhayal

unread,
Jul 8, 2020, 1:10:28 AM7/8/20
to Google Cloud Dataproc Discussions
Hi Everyone ,

 I want to trigger a cloud function to instantiate Dataproc workflow template based on stackdriver log messages (whenever the Dataproc cluster failed to create due to zonal failure). I have created sink for the Dataproc activity log with pub/sub topic then used in the cloud function .
But need help on responding to the event and instantiating Dataproc workflow template using either python or nodejs .

Any help would be appreciated .

Thanks,
Deena

karth...@google.com

unread,
Jul 8, 2020, 5:22:44 PM7/8/20
to Google Cloud Dataproc Discussions
You can use the python api client: https://googleapis.dev/python/dataproc/latest/index.html. The client API is nearly the same as the REST API. You can look at `gcloud dataproc workflow-templates instantiate-from-file`.

If you're just trying to re-run a workflow template if cluster creation fails, consider using something like Apache Airflow to do retries. And you can start Airflow DAGs via a cloud function. https://cloud.google.com/composer

Deena Dhayal

unread,
Jul 9, 2020, 4:02:03 AM7/9/20
to Google Cloud Dataproc Discussions
Thanks for the reply Karthik !!

I'm trying to trigger a cloud function to create a new dataproc cluster (not a workflow template) in the event of pub/sub topic which gets message whenever any dataproc cluster creation gets failed with status code of numeric digit apart from zero(status_code!=0) in cloud logging (dataproc activity log).

I have written a python code for the above scenario but this cloud function crashes immediately after triggering by pub/sub.

Would you please check what is wrong in this code and what needs to be modified based on the above scenario to execute successfully.

```

import base64
import json

import googleapiclient.discovery
from google.cloud import dataproc_v1 as dataproc

def dataproc_workflow(event, context):
    """
    Triggered by a Cloud Pub/Sub message containing a Dataproc status code != 0
    audit activity Stackdriver log message
    """
    pubsub_message = base64.b64decode(event['data']).decode('utf-8')
    msg_json = json.loads(pubsub_message)
    proto_payload = msg_json['protoPayload']
    resource_name = proto_payload['resourceName']
    email = proto_payload['authenticationInfo']['principalEmail']
    client = dataproc.ClusterControllerClient()
    create_cluster(client, project_id, zone, region, cluster_name)
    print(f"Cluster created: {cluster_name}.")

def create_cluster(client, project_id, zone, region, cluster_name):
    print('Creating cluster...')
    cluster_data = {
        'project_id': *********************,
        'cluster_name': simple,
        'region' : ******************,
        'config': {
            'config_bucket': ************,
    'gce_cluster_config': {
      'zone_uri': ********************,
      'subnetwork_uri': ********************,
      'internal_ip_only': true,
      'service_account_scopes': [
        'https://www.googleapis.com/auth/cloud-platform'
      ],
      'tags': [
        'dataproc-rule',
              ]
    },
            'master_config': {
                'num_instances': 1,
                'machine_type_uri': 'n1-standard-1'
            },
            'worker_config': {
                'num_instances': 2,
                'machine_type_uri': 'n1-standard-1'
                      }
        }     
    }
    response = client.create_cluster(project_id, region, cluster_data)
    result = response.result()
    print("After cluster create")
    
    return result    
```

Also advise , is this the ideal solution for achieving high availability of dataproc cluster over zonal failure ? . If any best practise , please advise.


Thanks,

Deena


Reply all
Reply to author
Forward
0 new messages