How to solve the Sql Deadlock error in Airflow

548 views
Skip to first unread message

Heqing Huang

unread,
Aug 14, 2019, 6:30:08 PM8/14/19
to cloud-composer-discuss
[2019-07-23 14:02:49,527] {models.py:1760} ERROR - This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (MySQLdb._exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction') [SQL: 'UPDATE task_instance SET state=%s WHERE task_instance.task_id = %s AND task_instance.dag_id = %s AND task_instance.execution_date = %s'] [parameters: ('queued', 'SnapshotCassandraTablesInBq-4-3-pipeline_sensor', 'snapshot_cassandra_tables_in_bq.SnapshotCassandraTablesInBq-4-3', datetime.datetime(2019, 7, 23, 12, 0, tzinfo=<Timezone [UTC]>))]
Traceback (most recent call last)
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_contex
    context
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 470, in do_execut
    cursor.execute(statement, parameters
  File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/cursors.py", line 198, in execut
    res = self._query(query
  File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/cursors.py", line 304, in _quer
    db.query(q
  File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/connections.py", line 217, in quer
    _mysql.connection.query(self, query
MySQLdb._exceptions.OperationalError: (1213, '
Deadlock found when trying to get lock; try restarting transaction'

The above exception was the direct cause of the following exception

Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/jobs.py", line 2545, in _execut
    session=session
  File "/usr/local/lib/airflow/airflow/utils/db.py", line 70, in wrappe
    return func(*args, **kwargs
  File "/usr/local/lib/airflow/airflow/jobs.py", line 2499, in _execute_for_run_date
    session=session
  File "/usr/local/lib/airflow/airflow/utils/db.py", line 70, in wrappe
    return func(*args, **kwargs
  File "/usr/local/lib/airflow/airflow/jobs.py", line 2363, in _process_backfill_task_instance
    session.commit(
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 927, in commi
    self.transaction.commit(
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 467, in commi
    self._prepare_impl(
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 447, in _prepare_imp
    self.sessi

Seeing this kind of logs in our console. Have been investigating how we can solve it.
After this error message happened, the Dag is going for a retry and the ETL pipeline get kicked off successfully.
Is there a way to filter out error messages, or only send out error if the Dag has actually failed.
Reply all
Reply to author
Forward
0 new messages