Idiomatic way to track state during ETL processing

16 views
Skip to first unread message

Andrew Martin

unread,
Mar 1, 2022, 12:32:12 AM3/1/22
to sqlalchemy
I haven't found any topics here that address this, so it may mean that the answer is so simple that I'm just overthinking here.

Context: I'm the lone developer on a new tech team at a company that's never had any team in place before. Everything is greenfield. Which is great because I get to do everything from scratch with no legacy stuff to account for. But it's also kind of a lot because I *have* to do everything from scratch.

The ETL pipeline I've prototyped takes a bunch of garbage data from SalesForce and tries to make it not-garbage and put it back into SalesForce. We start by normalizing addresses through shipping APIs, then use the normalized addresses to go to various different data broker APIs (FirstAmerican, Experian, Whitepages, etc.) to get owner information, followed by contact information.

I'm using apache airflow as a central automation manager and have it set up so that DAGs don't do anything more than call methods on a set of objects inside an "app".

Each API that we deal with is its own object and has methods for dealing with what's there based on current state. I load all the data that needs to be processed into postgres so we aren't constantly bumping into the SalesForce API, and I track state inside postgres with a state table.

Problem: what's the idiomatic way to manage this? I have something that works, but because this is all new, and our team will be growing soon, I want to lay down a solid foundation for when we move beyond proof of concept.

Model examples:

After the raw SalesForce data is loaded into a dumb table with no restrictions, we load the kickoff table that looks like this:

class AccountIngestedData(Base):
__tablename__ = "account_ingested_data"
__table_args__ = {"schema": "datablender"}

account_ingested_data_id = Column(
BigInteger,
primary_key=True,
server_default=text("nextval('account_ingested_data_id_seq'::regclass)"),
)
salesforce_id = Column(ForeignKey(AccountRawData.id), nullable=False)
account_data_source_id = Column(
ForeignKey(AccountDataSource.account_data_source_id), nullable=False
)
account_enrichment_source_field_mapper_id = Column(
ForeignKey(
AccountEnrichmentSourceFieldMapper.account_enrichment_source_field_mapper_id
),
nullable=False,
)
account_state_id = Column(ForeignKey(AccountState.account_state_id), nullable=False)
account_experian_file_id = Column(
ForeignKey(AccountExperianFile.account_experian_file_id), nullable=True
)

account_name = Column(Text, nullable=False)
account_address1 = Column(Text, nullable=False)
account_address2 = Column(Text, nullable=False)
account_city = Column(Text, nullable=False)
account_state = Column(Text, nullable=False)
account_zip = Column(Text, nullable=False)

account_data_source = relationship("AccountDataSource")
account_enrichment_source_field_mapper = relationship(
"AccountEnrichmentSourceFieldMapper"
)
account_experian_file = relationship("AccountExperianFile")
account_state = relationship("AccountState")
salesforce = relationship("AccountRawData")

And we have a state table:

class AccountState(Base):
__tablename__ = "account_state"
__table_args__ = {"schema": "datablender"}

account_state_id = Column(
BigInteger,
primary_key=True,
server_default=text("nextval('account_state_id_seq'::regclass)"),
)
account_state_description = Column(Text, nullable=False)

Example values for AccountState are like

100000, "ingested"
100001, "address valid"
100002, "address invalid"
100003, "address error"
100004, "address manual review"
100005, "owner info found"
100006, "owner info ambiguous"
100007, "owner info error"
100008, "owner info manual review"

Of course this list goes on and on as we add more and more integrations. At each stage of the automated pipeline, there's a query to get the records that correspond to each state.


My question is this: what's the cleanest way to assign the Foreign Key IDs to the accounts table based on what's happening?

I feel like there must be a really obvious solution to this that doesn't involve a global python dictionary that has to be updated every time a new status is added, and also doesn't have to bang another query on the status table for every record that gets loaded. But my brain can't figure out how to make this happen.

Really curious if anyone has thoughts about this pattern. It's obviously common, I just haven't seen the SQLA implementation before.


Reply all
Reply to author
Forward
0 new messages