Running Dataflow locally with DirectRunner, read from Pubsub

1,818 views
Skip to first unread message

Denis Haskin

unread,
Apr 23, 2021, 4:48:31 PM4/23/21
to pubsub-discuss
(I know this is sort of more a Dataflow question, but there's no Dataflow discussion group and there seem to be plenty Dataflow questions here...)

I'm trying to run a trivial example (based on the sql_taxi.py example in the Apache Beam repo) to read from a Pubsub subscription in a Dataflow job, running locally with Directrunner.

The code:

import json
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

def logit(element):
    logging.info('--- element and attributes ---')
    logging.info(element)
    logging.info(element.attributes)

def run(pipeline_args):
  pipeline_options = PipelineOptions(pipeline_args, save_main_session=True, streaming=True)

  with beam.Pipeline(options=pipeline_options) as pipeline:
    _ = (
        pipeline
        | beam.io.ReadFromPubSub(
            # topic='projects/pubsub-public-data/topics/taxirides-realtime',
            subscription='projects/em-dev-dh/subscriptions/taxirides-realtime',
            timestamp_attribute="ts").with_output_types(bytes)
        | "Parse JSON payload" >> beam.Map(json.loads)
        | "logit" >> beam.Map(logit)
        )

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  import argparse

  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args()

  run(pipeline_args)

running it with a service account credential:

GOOGLE_APPLICATION_CREDENTIALS=em-dev-dh-e1b78bd54750.json python taxi.py --runner=DirectRunner

but it errors out and it's very unclear why.  Any suggestions?  Thanks for any help!

Error output:

INFO:apache_beam.runners.direct.direct_runner:Running pipeline with DirectRunner.
request: projects/em-dev-dh/subscriptions/taxirides-realtime
subscription None
return_immediately True
max_messages 10
ERROR:apache_beam.runners.direct.executor:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7fac3218c4c8>, due to an exception.
 Traceback (most recent call last):
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 694, in _read_from_pubsub
    self._sub_name, max_messages=10, return_immediately=True)
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/google/cloud/pubsub_v1/_gapic.py", line 40, in <lambda>
    fx = lambda self, *a, **kw: wrapped_fx(self.api, *a, **kw)  # noqa
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/google/pubsub_v1/services/subscriber/client.py", line 1111, in pull
    "If the `request` argument is set, then none of "
ValueError: If the `request` argument is set, then none of the individual field arguments should be set.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/executor.py", line 382, in call
    finish_state)
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/executor.py", line 423, in attempt_call
    result = evaluator.finish_bundle()
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 706, in finish_bundle
    data = self._read_from_pubsub(self.source.timestamp_attribute)
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 700, in _read_from_pubsub
    sub_client.api.transport.channel.close()
AttributeError: 'SubscriberGrpcTransport' object has no attribute 'channel'

request: projects/em-dev-dh/subscriptions/taxirides-realtime
subscription None
return_immediately True
max_messages 10
ERROR:apache_beam.runners.direct.executor:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7fac3218c4c8>, due to an exception.
 Traceback (most recent call last):
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 694, in _read_from_pubsub
    self._sub_name, max_messages=10, return_immediately=True)
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/google/cloud/pubsub_v1/_gapic.py", line 40, in <lambda>
    fx = lambda self, *a, **kw: wrapped_fx(self.api, *a, **kw)  # noqa
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/google/pubsub_v1/services/subscriber/client.py", line 1111, in pull
    "If the `request` argument is set, then none of "
ValueError: If the `request` argument is set, then none of the individual field arguments should be set.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/executor.py", line 382, in call
    finish_state)
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/executor.py", line 423, in attempt_call
    result = evaluator.finish_bundle()
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 706, in finish_bundle
    data = self._read_from_pubsub(self.source.timestamp_attribute)
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 700, in _read_from_pubsub
    sub_client.api.transport.channel.close()
AttributeError: 'SubscriberGrpcTransport' object has no attribute 'channel'

request: projects/em-dev-dh/subscriptions/taxirides-realtime
subscription None
return_immediately True
max_messages 10
ERROR:apache_beam.runners.direct.executor:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7fac3218c4c8>, due to an exception.
 Traceback (most recent call last):
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 694, in _read_from_pubsub
    self._sub_name, max_messages=10, return_immediately=True)
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/google/cloud/pubsub_v1/_gapic.py", line 40, in <lambda>
    fx = lambda self, *a, **kw: wrapped_fx(self.api, *a, **kw)  # noqa
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/google/pubsub_v1/services/subscriber/client.py", line 1111, in pull
    "If the `request` argument is set, then none of "
ValueError: If the `request` argument is set, then none of the individual field arguments should be set.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/executor.py", line 382, in call
    finish_state)
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/executor.py", line 423, in attempt_call
    result = evaluator.finish_bundle()
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 706, in finish_bundle
    data = self._read_from_pubsub(self.source.timestamp_attribute)
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 700, in _read_from_pubsub
    sub_client.api.transport.channel.close()
AttributeError: 'SubscriberGrpcTransport' object has no attribute 'channel'

request: projects/em-dev-dh/subscriptions/taxirides-realtime
subscription None
return_immediately True
max_messages 10
ERROR:apache_beam.runners.direct.executor:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7fac3218c4c8>, due to an exception.
 Traceback (most recent call last):
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 694, in _read_from_pubsub
    self._sub_name, max_messages=10, return_immediately=True)
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/google/cloud/pubsub_v1/_gapic.py", line 40, in <lambda>
    fx = lambda self, *a, **kw: wrapped_fx(self.api, *a, **kw)  # noqa
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/google/pubsub_v1/services/subscriber/client.py", line 1111, in pull
    "If the `request` argument is set, then none of "
ValueError: If the `request` argument is set, then none of the individual field arguments should be set.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/executor.py", line 382, in call
    finish_state)
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/executor.py", line 423, in attempt_call
    result = evaluator.finish_bundle()
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 706, in finish_bundle
    data = self._read_from_pubsub(self.source.timestamp_attribute)
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 700, in _read_from_pubsub
    sub_client.api.transport.channel.close()
AttributeError: 'SubscriberGrpcTransport' object has no attribute 'channel'

ERROR:apache_beam.runners.direct.executor:Giving up after 4 attempts.
WARNING:apache_beam.runners.direct.executor:A task failed with exception: 'SubscriberGrpcTransport' object has no attribute 'channel'
Traceback (most recent call last):
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 694, in _read_from_pubsub
    self._sub_name, max_messages=10, return_immediately=True)
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/google/cloud/pubsub_v1/_gapic.py", line 40, in <lambda>
    fx = lambda self, *a, **kw: wrapped_fx(self.api, *a, **kw)  # noqa
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/google/pubsub_v1/services/subscriber/client.py", line 1111, in pull
    "If the `request` argument is set, then none of "
ValueError: If the `request` argument is set, then none of the individual field arguments should be set.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "taxi.py", line 33, in <module>
    run(pipeline_args)
  File "taxi.py", line 23, in run
    | "logit" >> beam.Map(logit)
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/pipeline.py", line 581, in __exit__
    self.result.wait_until_finish()
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/direct_runner.py", line 590, in wait_until_finish
    self._executor.await_completion()
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/executor.py", line 439, in await_completion
    self._executor.await_completion()
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/executor.py", line 488, in await_completion
    raise_(t, v, tb)
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/future/utils/__init__.py", line 441, in raise_
    raise exc
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/executor.py", line 382, in call
    finish_state)
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/executor.py", line 423, in attempt_call
    result = evaluator.finish_bundle()
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 706, in finish_bundle
    data = self._read_from_pubsub(self.source.timestamp_attribute)
  File "/Users/denis/redacted/env/lib/python3.6/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 700, in _read_from_pubsub
    sub_client.api.transport.channel.close()
AttributeError: 'SubscriberGrpcTransport' object has no attribute 'channel'

Denis Haskin

unread,
Apr 25, 2021, 2:09:06 PM4/25/21
to pubsub-discuss

FYI, this turned out to be incompatible package versions. My requirements.txt had been:

apache_beam[gcp]
google_apitools
google-cloud-pubsub

but that was installing a version of the google-cloud-pubsub package that was breaking apache_beam. I changed my requirements.txt to:

apache_beam[gcp]
google_apitools

and it all works now!

And for what it's worth, running locally with DirectRunner I obviously did not need a lot of the options that I needed for DataflowRunner. This sufficed:

GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json \
 RUNNER=DirectRunner \
 INPUT_SUBSCRIPTION=projects/mytopic/subscriptions/mysubscription \
 python read-pubsub-with-dataflow.py

Sway [Cloud Support]

unread,
Apr 26, 2021, 11:46:51 AM4/26/21
to pubsub-discuss

Thanks for following-up and posting what resolved the issue for you, as it can help others, which turned out to be related to dependencies.

I found 2 other similar threads [1][2] that mentioned as much.

Denis Haskin

unread,
Apr 28, 2021, 1:36:39 PM4/28/21
to pubsub-discuss
One of those threads is mine :-)
Reply all
Reply to author
Forward
0 new messages