ERROR - (pickle.PicklingError) Can't pickle <class 'celery.utils.log.ProcessAwareLogger'>

1,098 views
Skip to first unread message

Jordy

unread,
Jan 7, 2016, 12:07:30 PM1/7/16
to Airflow
Hey,

My remote worker seems to fail after a while with this message
ERROR - (pickle.PicklingError) Can't pickle <class 'celery.utils.log.ProcessAwareLogger'>: it's not found as celery.utils.log.ProcessAwareLogger

It seems that it is trying to picke the logger but i have no clue why.
I have a two server test setup where one server runs airflow webserver and scheduler
And the other server is running the worker.
The dags are only located on server the webserver/scheduler and with pickly they should go to the worker.
And i use Celery as the executor.

Now when i added the tutorial (as of the offical docs) to the dags folder, then everything seems to work fine (aka i see about 24  successfull tasks)
But after a little while nothing seem to happen anymore.
If i check Flower then it tells me thats tasks fail with this message:
AirflowException: Celery command failed

 And if i then check the workers log i get the Cant picke the logger error.

Hopefully anyone knows what its going wrong

Cheers

Jordy

unread,
Jan 7, 2016, 12:08:03 PM1/7/16
to Airflow
The error stack from the workers log


[2016-01-07 11:32:18,225] {jobs.py:455} INFO - Getting list of tasks to skip for active runs.
[2016-01-07 11:32:18,227] {jobs.py:470} INFO - Checking dependencies on 18 tasks instances, minus 0 skippable ones
[2016-01-07 11:32:18,371] {jobs.py:633} INFO - Done queuing tasks, calling the executor's heartbeat
[2016-01-07 11:32:18,371] {jobs.py:636} INFO - Loop took: 0.364169 seconds
[2016-01-07 11:32:18,373] {models.py:222} INFO - Finding 'running' jobs without a recent heartbeat
[2016-01-07 11:32:18,373] {models.py:228} INFO - Failing jobs without heartbeat after 2016-01-07 11:30:03.373523
[2016-01-07 11:32:23,011] {jobs.py:507} INFO - Prioritizing 0 queued jobs
[2016-01-07 11:32:23,035] {jobs.py:632} ERROR - (pickle.PicklingError) Can't pickle <class 'celery.utils.log.ProcessAwareLogger'>: it's not found as celery.utils.log.ProcessAwareLogger [SQL: u'INSERT INTO dag_pickle (pickle, created_dttm, pickle_hash) VALUES (%s, now(), %s)'] [parameters: [{'pickle_hash': -811270095097316367, 'pickle': <DAG: jordy>}]]
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 629, in _execute
    self.process_dag(dag, executor)
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 436, in process_dag
    pickle_id = dag.pickle(session).id
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 2389, in pickle
    session.commit()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 813, in commit
    self.transaction.commit()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 392, in commit
    self._prepare_impl()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 372, in _prepare_impl
    self.session.flush()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2027, in flush
    self._flush(objects)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2145, in _flush
    transaction.rollback(_capture_exception=True)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line 60, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2109, in _flush
    flush_context.execute()
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", line 373, in execute
    rec.execute(self)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", line 532, in execute
    uow
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 174, in save_obj
    mapper, table, insert)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 800, in _emit_insert_statements
    execute(statement, params)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 914, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/elements.py", line 323, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1010, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1078, in _execute_context
    None, None)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1341, in _handle_dbapi_exception
    exc_info
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/compat.py", line 200, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1073, in _execute_context
    context = constructor(dialect, self, conn, *args)
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/default.py", line 582, in _init_compiled
    param.append(processors[key](compiled_params[key]))
  File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/sqltypes.py", line 1241, in process
    value = dumps(value, protocol)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 192, in dumps
    dump(obj, file, protocol, byref, fmode, recurse)#, strictio)
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 186, in dump
    pik.dump(obj)
  File "/usr/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python2.7/pickle.py", line 419, in save_reduce
    save(state)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 748, in save_module_dict
    StockPickler.save_dict(pickler, obj)
  File "/usr/lib/python2.7/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib/python2.7/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/python2.7/pickle.py", line 396, in save_reduce
    save(cls)
  File "/usr/lib/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 1102, in save_type
    StockPickler.save_global(pickler, obj)
  File "/usr/lib/python2.7/pickle.py", line 748, in save_global
    (obj, module, name))
StatementError: (pickle.PicklingError) Can't pickle <class 'celery.utils.log.ProcessAwareLogger'>: it's not found as celery.utils.log.ProcessAwareLogger [SQL: u'INSERT INTO dag_pickle (pickle, created_dttm, pickle_hash) VALUES (%s, now(), %s)'] [parameters: [{'pickle_hash': -811270095097316367, 'pickle': <DAG: jordy>}]]
[2016-01-07 11:32:23,043] {jobs.py:653} ERROR - This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (pickle.PicklingError) Can't pickle <class 'celery.utils.log.ProcessAwareLogger'>: it's not found as celery.utils.log.ProcessAwareLogger [SQL: u'INSERT INTO dag_pickle (pickle, created_dttm, pickle_hash) VALUES (%s, now(), %s)'] [parameters: [{'pickle_hash': -811270095097316367, 'pickle': <DAG: jordy>}]]

Maxime Beauchemin

unread,
Jan 13, 2016, 12:13:26 PM1/13/16
to Airflow
This looks like a scheduler log, not a worker log amirite?

We don't use pickling on our side for the scheduler. I'm guessing you are starting the scheduler with the `-p` (same as --do_pickle) flag. Do you need that?

Pickling has been only partially supported and we need to iron this out in the next release. If your DAG isn't pickleable you may want to look into the "backdoors" that allow people to pass non-pickleable object references to their DAGs: default_args, params, callbacks, ...

It's a bit tricky but you can troubleshoot the reason why your dag isn't pickleable in ipython with pdb on. It's usually easier to process by intuition/elimination than through the debugger.

Max

Jordy

unread,
Jan 13, 2016, 12:43:02 PM1/13/16
to Airflow
Hey Maxime thanks for replying!

It indeed looks like the scheduler log which would make more sense. But i will check to confirm when i am on the office tomorrow.

I indeed started the scheduler with the --do_pickle flag
The idea was to only place the dags on the server where the scheduler is running. So i do not need to worry to keep the dags in sync.
The dag "jordy" used is actually the "tutorial" dag from the documentation. But with only the print_date task (the rest of the tasks are removed)
So i found it weird that even the tutorial dag does not work for pickling. But i did not know that the feature was only partially supported.
I thought that it would just serialize the dag file and send to the worker servers and then be executed their as if the file was on that server.
If that is the case then any text file should be able to be serialized and send right?

I will dig a bit deeper into the airflow code to see what is really going on.

Thanks max
Reply all
Reply to author
Forward
0 new messages