# Register this engine strategy somewhere in your imported models
class RdsEngineStrategy(PlainEngineStrategy):
name = 'rds'
def create(self, name_or_url, **kwargs):
"""Adds an RDS-specific 'creator' for the engine connection."""
engine_url = make_url(name_or_url)
kwargs['creator'] = self._rds_engine_creator(engine_url)
return super().create(engine_url, **kwargs)
def _rds_engine_creator(self, engine_url):
instance_id, region = engine_url.host.split('.')
connector = engine_url.get_dialect().dbapi().connect
rds = boto3.client('rds', region_name=region)
if self._rds_first_instance_by_name(rds, instance_id) is None:
raise ValueError('No RDS instances for the given instance ID')
def engine_func():
instance = self._rds_first_instance_by_name(rds, instance_id)
password = rds.generate_db_auth_token(
DBHostname=instance['Endpoint']['Address'],
DBUsername=engine_url.username,
Port=instance['Endpoint']['Port'])
return connector(
host=instance['Endpoint']['Address'],
port=instance['Endpoint']['Port'],
database=engine_url.database,
user=engine_url.username,
password=password,
sslmode='require')
return engine_func
def _rds_first_instance_by_name(self, client, name):
response = client.describe_db_instances(DBInstanceIdentifier=name)
return next(iter(response['DBInstances']), None)
# Make sure to actually register it
RdsEngineStrategy()
# Caller code
engine = sqlalchemy.create_engine("postgres://us...@instance-name.region/dbname", strategy="rds")