I am trying to test the PubSubPublishOperator by way of the example_pubsub_flow.py. No modification is done to the way. When I execute the DAG, it simply errors in t3 (task-id: publish-message').
Digging into the log of the T3 task instance I see there's an exception raised in model.py within the core support of Airflow. I am using Cloud Composer (Airflow 1.10.0).
*** Reading remote log from gs://europe-west1-composer--sema-52258036-bucket/logs/pubsub-end-to-end/publish-messages/2018-11-21T05:14:48.770627+00:00/1.log.
[2018-11-21 05:15:49,476] {models.py:1335} INFO - Dependencies all met for <TaskInstance: pubsub-end-to-end.publish-messages 2018-11-21T05:14:48.770627+00:00 [queued]>
[2018-11-21 05:15:49,508] {models.py:1335} INFO - Dependencies all met for <TaskInstance: pubsub-end-to-end.publish-messages 2018-11-21T05:14:48.770627+00:00 [queued]>
[2018-11-21 05:15:49,509] {models.py:1547} INFO -
-------------------------------------------------------------------------------
Starting attempt 1 of
-------------------------------------------------------------------------------
[2018-11-21 05:15:49,526] {models.py:1569} INFO - Executing <Task(PubSubPublishOperator): publish-messages> on 2018-11-21T05:14:48.770627+00:00
[2018-11-21 05:15:49,527] {base_task_runner.py:124} INFO - Running: ['bash', '-c', 'airflow run pubsub-end-to-end publish-messages 2018-11-21T05:14:48.770627+00:00 --job_id 108 --raw -sd DAGS_FOLDER/pubsub_flow.py --cfg_path /tmp/tmpv7o3old4']
[2018-11-21 05:15:51,055] {base_task_runner.py:107} INFO - Job 108: Subtask publish-messages [2018-11-21 05:15:51,054] {settings.py:176} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800
[2018-11-21 05:15:51,554] {base_task_runner.py:107} INFO - Job 108: Subtask publish-messages [2018-11-21 05:15:51,553] {default_celery.py:80} WARNING - You have configured a result_backend of redis://airflow-redis-service:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2018-11-21 05:15:51,556] {base_task_runner.py:107} INFO - Job 108: Subtask publish-messages [2018-11-21 05:15:51,555] {__init__.py:51} INFO - Using executor CeleryExecutor
[2018-11-21 05:15:51,804] {base_task_runner.py:107} INFO - Job 108: Subtask publish-messages [2018-11-21 05:15:51,803] {models.py:258} INFO - Filling up the DagBag from /home/airflow/gcs/dags/pubsub_flow.py
[2018-11-21 05:15:54,443] {base_task_runner.py:107} INFO - Job 108: Subtask publish-messages [2018-11-21 05:15:54,443] {cli.py:490} INFO - Running <TaskInstance: pubsub-end-to-end.publish-messages 2018-11-21T05:14:48.770627+00:00 [running]> on host airflow-worker-654c9bdb9c-sm6v9
[2018-11-21 05:15:54,563] {models.py:1736} ERROR - Type '<class 'bytes'>' used for parameter 'messages[data]' is not supported for templating
snippet of DAG (example_pubsub_flow.py) part of airflow/.../contrib/*** in Airflow 1.10 distribution
with DAG('pubsub-end-to-end', default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
t1 = PubSubTopicCreateOperator(task_id='create-topic')
t2 = PubSubSubscriptionCreateOperator(
task_id='create-subscription', topic_project=project,
subscription=subscription)
t3 = PubSubPublishOperator(
task_id='publish-messages', messages=messages)
t4 = PubSubPullSensor(task_id='pull-messages', ack_messages=True)
t5 = BashOperator(task_id='echo-pulled-messages',
bash_command=echo_template)
t6 = PubSubSubscriptionDeleteOperator(task_id='delete-subscription')
t7 = PubSubTopicDeleteOperator(task_id='delete-topic')
t1 >> t2 >> t3
t2 >> t4 >> t5 >> t6 >> t7]