Kafka -> S3 -> Redshift workflows

428 views
Skip to first unread message

Or Sher

unread,
Apr 14, 2016, 11:05:41 AM4/14/16
to Airflow
Hi guys,

I want to create the following 2 workflows in Airflow:

The first workflow should be triggered every five minutes, consume new kafka messages and upload it to S3 to the daily partition.
The second workflow should be triggered once an hour and load all new files (could be various number of files in various partitions) to Redshift.


A few questions:

1. Is it a good Idea to move the consumption workflow to Airflow? I saw here that Airbnb is using Secor.
2. I need some solution to link between the first workflow runs output and the second workflow. Meaning I need to know what are the new files I haven't yet uploaded to Redshift.
   Do you think Airflow could contribute here? How would you implement a solution for it?

Thanks,
Or.

Jeremiah Lowin

unread,
Apr 14, 2016, 6:25:18 PM4/14/16
to Airflow
Hi Or,

I'm not sure about the first point, but for the second take a look at XCom's (Cross-Communication between tasks) -- http://pythonhosted.org/airflow/concepts.html#xcoms. XComs are little messages that are stored in the database. Your first task could "push" an XCom with the names of the files it created, and your second task could "pull" the information to parameterize the redshift call.

Jeremiah

Or Sher

unread,
Apr 17, 2016, 2:24:43 AM4/17/16
to Airflow
Hi Jeremiah,

Thanks for the answer.
I already aware of XCom's and actually used it for another workflow.
The problem is, these two workflows are scheduled differently, hence will be splitted into two different DAGs.
1st DAG - Consuming Kakfa messages every 5 minutes and uploading it to S3.
2nd DAG - Uploading all new files to Redshift every hour.

I don't think XCom's are working cross DAG's, aren't they?

Jeremiah Lowin

unread,
Apr 18, 2016, 2:42:58 PM4/18/16
to Airflow
Hi Or,

That shouldn't be an issue -- by default, XComs only search in the same DAG, but by specifying a DAG, you should be able to pull from elsewhere. Have a look at the docs for xcom_pull for the full details, but you can pass dag_id, task_id, key, and a parameter to include prior execution dates to xcom_pull. By default, only the current DAG and execution_date are searched.

Reply all
Reply to author
Forward
0 new messages