Airflow breaks trying to push execution_date to XCOM as it attempts to serialize a datetime object to JSON

2,435 views
Skip to first unread message

Rodrigo Oliveira

unread,
Jul 24, 2019, 4:13:09 PM7/24/19
to cloud-composer-discuss
Function xcom_push from airflow.models.taskinstance automatically sets the execution_date to XCOM, this can only happen with pickling enabled but in Google Composer's Airflow 1.10.2 image it just won't work as `core-enable_xcom_pickling` is hardcoded to be `False` consequentially XCOM will attempt to JSON serialize a `datetime` object, which is unsupported.

How can I workaround this?

The stacktrace I get when attempting to run a task which returns something to XCOM is the following:

TypeError: Object of type 'datetime' is not JSON serializable
[2019-07-24 19:14:53,193] {models.py:1796} ERROR - Object of type 'datetime' is not JSON serializable
Traceback (most recent call last):
File "/usr/local/lib/airflow/airflow/models.py", line 1668, in _run_raw_task
self.xcom_push(key=XCOM_RETURN_KEY, value=result)
File "/usr/local/lib/airflow/airflow/models.py", line 2063, in xcom_push
execution_date=execution_date or self.execution_date)
File "/usr/local/lib/airflow/airflow/utils/db.py", line 73, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/airflow/airflow/models.py", line 4785, in set
value = json.dumps(value).encode('UTF-8')
File "/opt/python3.6/lib/python3.6/json/_init_.py", line 231, in dumps
return _default_encoder.encode(obj)
File "/opt/python3.6/lib/python3.6/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/opt/python3.6/lib/python3.6/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/opt/python3.6/lib/python3.6/json/encoder.py", line 180, in default
o._class.name_)
TypeError: Object of type 'datetime' is not JSON serializable

Thanks in advance

Kiran Arshewar

unread,
Jul 24, 2019, 4:37:34 PM7/24/19
to Rodrigo Oliveira, cloud-composer-discuss

The Error does not seem to be xcom error.

--
You received this message because you are subscribed to the Google Groups "cloud-composer-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-di...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cloud-composer-discuss/87b2b5ca-1143-4741-9a54-9306754703e1%40googlegroups.com.

Privileged/Confidential Information may be contained in this message. If you are
not the addressee indicated in this message (or responsible for delivery of the
message to such person), you may not copy or deliver this message to anyone. In
such case, you should destroy this message and kindly notify the sender by reply
email. Please advise immediately if you or your employer does not consent to email
for messages of this kind. Opinions, conclusions and other information in this
message that do not relate to the official business of Group M Worldwide LLC and/or
other members of the GroupM group of companies shall be understood as neither given
nor endorsed by it. GroupM is the global media investment management arm of WPP.
For more information on our business ethical standards and Corporate Responsibility
policies please refer to WPP's website at http://www.wpp.com/WPP/About/

Rodrigo Oliveira

unread,
Jul 24, 2019, 4:49:30 PM7/24/19
to cloud-composer-discuss
It is caused by Airflow's taskinstance calling calling xcom_push, which will call xcom.set with an datetime object for execution_date and this will break when under  airflow.cfg  you have  core-enable_xcom_pickling=False .

To unsubscribe from this group and stop receiving emails from it, send an email to cloud-composer-discuss+unsub...@googlegroups.com.

Rodrigo Oliveira

unread,
Jul 24, 2019, 5:00:29 PM7/24/19
to cloud-composer-discuss
I was able to workaround this issue by setting json default encoder to one which supports datetime objets. I used this util:

import datetime
import json


class DateTimeEncoder(json.JSONEncoder):
"""JSON Encoder which supports encoding datetime objects"""

def default(self, obj): # type: ignore
"""Support datetime encoding"""
if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
return obj.isoformat()
elif isinstance(obj, datetime.timedelta):
return (datetime.datetime.min + obj).time().isoformat()


def set_json_default_encoder() -> None:
"""Sets the default JSON encoder to DateTimeEncoder"""
json._default_encoder = DateTimeEncoder() # type: ignore

Jillian Kozyra

unread,
Jan 13, 2022, 5:03:17 PM1/13/22
to cloud-composer-discuss
Where does this override go?
Reply all
Reply to author
Forward
0 new messages