I'd like to optimize querying and converting a list of Oracle tables into pandas dataframes.
The eventual goal is to convert to Parquet, write to disk, then upload to S3, but for now I just want to focus on the pandas / sqlalchemy / parallelism part. My code sort of works, but it's very slow and seems to hang after completing 10 tables.
Any advice for speeding things up or alternative suggestions?
import sqlalchemy
from sqlalchemy.orm import sessionmaker, scoped_session
from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing import Pool
import pyarrow as pa
import pyarrow.parquet as pq
def process_chunk(chunk, table_name, index):
table = pa.Table.from_pandas(chunk)
local_file_name = "./" + table_name + "-" + str(index) + ".parquet"
pq.write_table(table, local_file_name)
def process_table(table):
db_session = DBSession()
# helper function that creates the SQL query (select col1, col2, col3, ..., colX from table)
query = setup_query(table)
i=0
# is this the right way to use the db_session?
for chunk in pd.read_sql(query, db_session.bind, chunksize=300000):
process_chunk(chunk, table, i)
i+=1
oracle_connect_str = #string_here#
oracle_engine = sqlalchemy.create_engine(
oracle_connect_str,
arraysize=10000
)
# set up session object to be used by threads
DBSession = scoped_session(
sessionmaker(
autoflush=True,
autocommit=False,
bind=oracle_engine
)
)
pool = ThreadPool(4)
table_list = ['tbl1','tbl2','tbl3','tbl4','tbl5',...,'tbl20']
# use pool.map instead of creating boiler-plate threading class
pool.map(process_table, table_list)
# are these in the right spots?
pool.close()
pool.join()
Thanks!