Airflow example_pubsub.py failing with exception ("ERROR - Type '<class 'bytes'>' used for...)

437 views
Skip to first unread message

jeffre...@google.com

unread,
Nov 21, 2018, 1:25:04 PM11/21/18
to cloud-composer-discuss
Hi -

I am trying to test the PubSubPublishOperator by way of the example_pubsub_flow.py.  No modification is done to the way. When I execute the DAG, it simply errors in t3 (task-id: publish-message').

Digging into the log of the T3 task instance I see there's an exception raised in model.py within the core support of Airflow. I am using Cloud Composer (Airflow 1.10.0).

Log Snippet:

*** Reading remote log from gs://europe-west1-composer--sema-52258036-bucket/logs/pubsub-end-to-end/publish-messages/2018-11-21T05:14:48.770627+00:00/1.log.
[2018-11-21 05:15:49,476] {models.py:1335} INFO - Dependencies all met for <TaskInstance: pubsub-end-to-end.publish-messages 2018-11-21T05:14:48.770627+00:00 [queued]>
[2018-11-21 05:15:49,508] {models.py:1335} INFO - Dependencies all met for <TaskInstance: pubsub-end-to-end.publish-messages 2018-11-21T05:14:48.770627+00:00 [queued]>
[2018-11-21 05:15:49,509] {models.py:1547} INFO -
-------------------------------------------------------------------------------
Starting attempt 1 of 
-------------------------------------------------------------------------------

[2018-11-21 05:15:49,526] {models.py:1569} INFO - Executing <Task(PubSubPublishOperator): publish-messages> on 2018-11-21T05:14:48.770627+00:00
[2018-11-21 05:15:49,527] {base_task_runner.py:124} INFO - Running: ['bash', '-c', 'airflow run pubsub-end-to-end publish-messages 2018-11-21T05:14:48.770627+00:00 --job_id 108 --raw -sd DAGS_FOLDER/pubsub_flow.py --cfg_path /tmp/tmpv7o3old4']
[2018-11-21 05:15:51,055] {base_task_runner.py:107} INFO - Job 108: Subtask publish-messages [2018-11-21 05:15:51,054] {settings.py:176} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800
[2018-11-21 05:15:51,554] {base_task_runner.py:107} INFO - Job 108: Subtask publish-messages [2018-11-21 05:15:51,553] {default_celery.py:80} WARNING - You have configured a result_backend of redis://airflow-redis-service:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2018-11-21 05:15:51,556] {base_task_runner.py:107} INFO - Job 108: Subtask publish-messages [2018-11-21 05:15:51,555] {__init__.py:51} INFO - Using executor CeleryExecutor
[2018-11-21 05:15:51,804] {base_task_runner.py:107} INFO - Job 108: Subtask publish-messages [2018-11-21 05:15:51,803] {models.py:258} INFO - Filling up the DagBag from /home/airflow/gcs/dags/pubsub_flow.py
[2018-11-21 05:15:54,443] {base_task_runner.py:107} INFO - Job 108: Subtask publish-messages [2018-11-21 05:15:54,443] {cli.py:490} INFO - Running <TaskInstance: pubsub-end-to-end.publish-messages 2018-11-21T05:14:48.770627+00:00 [running]> on host airflow-worker-654c9bdb9c-sm6v9
[2018-11-21 05:15:54,563] {models.py:1736} ERROR - Type '<class 'bytes'>' used for parameter 'messages[data]' is not supported for templating

enc: t3-publish-messages.logs

snippet of DAG (example_pubsub_flow.py) part of airflow/.../contrib/*** in Airflow 1.10 distribution

with DAG('pubsub-end-to-end', default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
t1 = PubSubTopicCreateOperator(task_id='create-topic')
t2 = PubSubSubscriptionCreateOperator(
task_id='create-subscription', topic_project=project,
subscription=subscription)
t3 = PubSubPublishOperator(
task_id='publish-messages', messages=messages)
t4 = PubSubPullSensor(task_id='pull-messages', ack_messages=True)
t5 = BashOperator(task_id='echo-pulled-messages',
bash_command=echo_template)
t6 = PubSubSubscriptionDeleteOperator(task_id='delete-subscription')
t7 = PubSubTopicDeleteOperator(task_id='delete-topic')

t1 >> t2 >> t3
t2 >> t4 >> t5 >> t6 >> t7]
t3_pubish-messages.logs

Wilson Lian

unread,
Nov 21, 2018, 3:53:31 PM11/21/18
to jeffre...@google.com, cloud-composer-discuss
Are you by any chance running this in a python 3 environment?

--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-discuss+unsub...@googlegroups.com.
To post to this group, send email to cloud-composer-discuss@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-composer-discuss/8bc28f32-4e9d-4cda-ab36-bf55705e70a1%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Jeffrey Lucas

unread,
Nov 21, 2018, 4:05:22 PM11/21/18
to wwl...@google.com, cloud-compo...@googlegroups.com
Not sure. I am using the beta feature of composer and referenced the gcloud composer ..., to create the composer environment with Airflow 1.10. 


To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.
To post to this group, send email to cloud-compo...@googlegroups.com.

jeffre...@google.com

unread,
Nov 21, 2018, 11:19:30 PM11/21/18
to cloud-composer-discuss
Looks like, I am running python 3. It appears this is required since I am using Airflow 1.10.  I get an error by the Cloud Composer if I attempt to mate python 2 and Airflow 1.0.  Atlas, Airflow 1.9.0 isn't fully supported for the GCP integration. Most of the Operators don't work.

Regarding this class type exception. What is the protocol for the message.data field?  Do I really need tot encode as binary representation.  Why can'' I use string or something else.  The documentation is truly lacking with the Airflow stack; especially the contrib operators for the GCP stack.  It would be nice to be able to use the basic GS, PubSub, CloudSQL, etc out of the box.

Anyone know the procedure for generating an issue in the bug tracking system?  This is a legit bug, as it simply doesn't work as advertised :).

-Jeff


On Wednesday, November 21, 2018 at 1:05:22 PM UTC-8, Jeffrey Lucas wrote:
Not sure. I am using the beta feature of composer and referenced the gcloud composer ..., to create the composer environment with Airflow 1.10. 


On Wed, Nov 21, 2018 at 12:53 PM Wilson Lian <wwl...@google.com> wrote:
Are you by any chance running this in a python 3 environment?
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-discuss+unsub...@googlegroups.com.
To post to this group, send email to cloud-composer-discuss@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages