xcom push error

1,270 views
Skip to first unread message

leonardo zanella

unread,
Nov 19, 2018, 1:31:21 PM11/19/18
to cloud-composer-discuss
Hi all, I'm currently working with  google composer beta version with airflow version 1.10 and python 3.

In one of my dags I'm passing an pandas dataframe (as json, so it can be uploaded to xcom) and i get this error:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
    context)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 250, in execute
    self.errorhandler(self, exc, value)
  File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
    raise errorvalue
  File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 247, in execute
    res = self._query(query)
  File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 412, in _query
    rowcount = self._do_query(q)
  File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 375, in _do_query
    db.query(q)
  File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", line 276, in query
    _mysql.connection.query(self, query)
_mysql_exceptions.DataError: (1406, "Data too long for column 'value' at row 1")

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/airflow/airflow/models.py", line 1633, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 95, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 100, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/airflow/gcs/dags/src/core/analytics.py", line 26, in get_guids_daily
    task_instance.xcom_push(key='data', value=data.to_json())
  File "/usr/local/lib/airflow/airflow/models.py", line 1983, in xcom_push
    execution_date=execution_date or self.execution_date)
  File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/airflow/airflow/models.py", line 4557, in set
    session.commit()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 927, in commit
    self.transaction.commit()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 467, in commit
    self._prepare_impl()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 447, in _prepare_impl
    self.session.flush()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2209, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2329, in _flush
    transaction.rollback(_capture_exception=True)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 66, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 187, in reraise
    raise value
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2293, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 389, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 548, in execute
    uow
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 181, in save_obj
    mapper, table, insert)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 835, in _emit_insert_statements
    execute(statement, params)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 945, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
    context)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1402, in _handle_dbapi_exception
    exc_info
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 186, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_context
    context)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 470, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 250, in execute
    self.errorhandler(self, exc, value)
  File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
    raise errorvalue
  File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 247, in execute
    res = self._query(query)
  File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 412, in _query
    rowcount = self._do_query(q)
  File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 375, in _do_query
    db.query(q)
  File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", line 276, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.DataError: (_mysql_exceptions.DataError) (1406, "Data too long for column 'value' at row 1") [SQL: 'INSERT INTO xcom (`key`, value, timestamp, execution_date, task_id, dag_id) VALUES (%s, %s, %s, %s, %s, %s)'] [parameters: ('data', b'"{\\"ga:date\\":{\\"0\\":\\"20181118\\",\\"1\\":\\"20181118\\",\\"2\\":\\"20181118\\",\\"3\\":\\"20181118\\",\\"4\\":\\"20181118\\",\\"5\\":\\"20181 ... (227910 characters truncated) ... credi\\",\\"523\\":\\"bcredi\\",\\"524\\":\\"bcredi\\",\\"525\\":\\"bcredi\\",\\"526\\":\\"bcredi\\",\\"527\\":\\"bcredi\\",\\"528\\":\\"bcredi\\"}}"', datetime.datetime(2018, 11, 19, 18, 15, 12, 189942, tzinfo=<Timezone [UTC]>)

this does not happen in my local instance, I don't know what happen,

can someone help me please?

thanks

Tim Swast

unread,
Nov 30, 2018, 6:24:43 PM11/30/18
to cloud-composer-discuss
Xcom is not intended to be used for large values such as a pandas dataframe. I'd recommend restructuring your DAG to serialize to the /home/airflow/gcs/data folder (see: https://cloud.google.com/composer/docs/concepts/cloud-storage#folders_in_the_storage_name_bucket) or write to Google Cloud Storage directly. Then pass the path to the file location in XCOM.

z...@useracquisition.com

unread,
Nov 30, 2018, 7:01:27 PM11/30/18
to cloud-composer-discuss
Yeah, there's a limit on bytes in an xcom so you might be running into this with production data, but it might be working locally because you have test data (which happens to be smaller).
Reply all
Reply to author
Forward
0 new messages