We've been doing pretty much exactly what you're talking about. We just wrote a small shim between airflow connections and sqlalchemy engines, which we then use inside PythonOperator tasks and our own custom tasks.
It ends up being, roughly:
def get_engine(conn_id):
connection = hooks.BaseHook.get_connection(conn_id)
connection_uri = '{c.conn_type}://{c.login}:{c.password}@{c.host}:{c.port}/{c.schema}'.format(c=connection)
return sqlalchemy.create_engine(connection_uri)
I had to remove a couple extra pieces, so I might have mistyped something, but that should be a pretty good starting point.