Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

ETL code in Oracle

15 views
Skip to first unread message

Yogesh Palrecha

unread,
Feb 16, 2023, 8:26:32 AM2/16/23
to
Trying to run this ETL code in Oracle. Not sure if this is a good design or not. We are trying to build an ETL framework in Oracle which can handle parallel processing and exception handling. Trying to run this ETL code in Oracle. Not sure if this is a good design or not.

create table prc_run_config (
prc_run_config_id INTEGER,
prc_name VARCHAR2(100),
prc_start_range INTEGER,
prc_end_range INTEGER,
prc_created_time TIMESTAMP
);

create table prc_run_log (
prc_run_log_id INTEGER,
prc_run_config_id INTEGER,
prc_start_time TIMESTAMP,
prc_end_time TIMESTAMP,
prc_status VARCHAR2(100),
prc_run_sql VARCHAR2(4000),
prc_run_msg VARCHAR2(4000)
);

create table PARTY_LKP (
party_id INTEGER,
party_num VARCHAR2(100)
)

DECLARE
BEGIN
FOR i IN 1..100
LOOP
INSERT INTO PARTY_LKP VALUES (i,SYS_GUID());
END LOOP;
END;
/
select rownum,party_id,party_num from party_lkp

--proc_gen_run_config (prc_name, parallel_threads)
DECLARE
v_total INT;
v_start_range INT;
v_end_range INT;
BEGIN
SELECT count(*) INTO v_total FROM PARTY_LKP;
FOR i IN 1..10
LOOP
IF i = 1 THEN
v_start_range := 1;
v_end_range := v_total/10;
ELSE
v_start_range := v_end_range+1;
v_end_range := v_start_range + v_total/10 - 1;
END IF;
INSERT INTO prc_run_config VALUES (i,'LOAD_CORE_TBL',v_start_range,v_end_range,SYSTIMESTAMP);
END LOOP;
END;
/

truncate table prc_run_config
select * from prc_run_config

--proc_run_prc (prc_run_config_id)
DECLARE
v_start_range INT;
v_end_range INT;
v_party_num VARCHAR2(100);
v_sql_error VARCHAR2(1000);
BEGIN
SELECT prc_start_range,prc_end_range INTO v_start_range,v_end_range FROM PRC_RUN_CONFIG WHERE prc_run_config_id=1;
INSERT INTO prc_run_log VALUES (1,1,SYSTIMESTAMP,NULL,'STARTED',NULL,'Processing started');
FOR i IN v_start_range..v_end_range
LOOP
SELECT party_num INTO v_party_num FROM PARTY_LKP WHERE party_id = i;
DBMS_OUTPUT.PUT_LINE('Processing party ' || v_party_num || '...');
END LOOP;
UPDATE prc_run_log SET prc_end_time=SYSTIMESTAMP, prc_status='COMPLETED', prc_run_msg='Processing completed' WHERE prc_run_log_id=1;
EXCEPTION
WHEN OTHERS THEN
DBMS_OUTPUT.PUT_LINE (SQLERRM);
v_sql_error := SQLERRM;
UPDATE prc_run_log SET prc_end_time=SYSTIMESTAMP, prc_status='FAILED', prc_run_msg=v_sql_error WHERE prc_run_log_id=1;
END;
/

SELECT rownum,party_num FROM PARTY_LKP WHERE rownum = 2;

truncate table prc_run_log
select * from prc_run_log

Yogesh Palrecha

unread,
Feb 22, 2023, 6:00:32 PM2/22/23
to
create table prc_run_config (
prc_run_config_id INTEGER,
prc_name VARCHAR2(100),
prc_start_range INTEGER,
prc_end_range INTEGER,
prc_created_time TIMESTAMP
);

create table prc_run_log_summary (
prc_run_log_id INTEGER,
prc_run_config_id INTEGER,
prc_start_time TIMESTAMP,
prc_end_time TIMESTAMP,
prc_status VARCHAR2(100),
prc_msg VARCHAR2(4000)
);

create table prc_run_log_details (
prc_run_log_id INTEGER,
prc_run_config_id INTEGER,
prc_timestamp TIMESTAMP,
prc_sql VARCHAR2(4000),
prc_msg VARCHAR2(4000)
);

create table PARTY (
party_id INTEGER,
party_key VARCHAR2(100)
)

TRUNCATE TABLE PARTY;

DECLARE
BEGIN
FOR i IN 1..111
LOOP
INSERT INTO PARTY VALUES (i,SYS_GUID());
END LOOP;
END;
/
select rownum,party_id,party_num from party

create table prc_party_status (
prc_id INTEGER,
prc_status VARCHAR2(100),
party_key VARCHAR2(100)
)

TRUNCATE TABLE prc_party_status;

INSERT INTO prc_party_status SELECT ROW_NUMBER() OVER(ORDER BY party_id),'PENDING',party_key FROM party

select * From prc_party_status
111
select ROUND(111/10) from dual
--proc_gen_run_config (prc_name, v_batch_size)
DECLARE
v_total INT;
v_start_range INT;
v_end_range INT;
v_loop_count INT;
v_batch_size INT;
BEGIN
v_batch_size := 25;
SELECT count(*) INTO v_total FROM PARTY_BATCH_PRC;
SELECT ROUND(v_total/v_batch_size) INTO v_loop_count from DUAL;
FOR i IN 1..v_loop_count
LOOP
IF i = 1 THEN
v_start_range := 1;
v_end_range := v_batch_size;
ELSIF i = v_loop_count THEN
v_start_range := v_end_range+1;
v_end_range := v_total;
ELSE
v_start_range := v_end_range+1;
v_end_range := v_start_range + v_batch_size - 1;
END IF;
INSERT INTO prc_run_config VALUES (i,'LOAD_CORE_TBL',v_start_range,v_end_range,SYSTIMESTAMP);
END LOOP;
END;
/

truncate table prc_run_config
select * from prc_run_config

--proc_run_prc (prc_run_config_id)
DECLARE
v_start_range INT;
v_end_range INT;
v_prc_status VARCHAR2(100);
v_party_key VARCHAR2(100);
v_sql_error VARCHAR2(1000);
v_sql VARCHAR2(4000);
v_run_config_id INT;
v_prc_run_log_id INT;
BEGIN
v_run_config_id := 1;
SELECT prc_start_range,prc_end_range INTO v_start_range,v_end_range FROM PRC_RUN_CONFIG WHERE prc_run_config_id=v_run_config_id;
SELECT NVL(MAX(prc_run_log_id)+1,1) INTO v_prc_run_log_id FROM prc_run_log_summary;
INSERT INTO prc_run_log_summary VALUES (v_prc_run_log_id,v_run_config_id,SYSTIMESTAMP,NULL,'STARTED','Processing started');
COMMIT;
FOR i IN v_start_range..v_end_range
LOOP
SELECT party_key,prc_status INTO v_party_key,v_prc_status FROM prc_party_status WHERE prc_id = i;
IF v_prc_status = 'PENDING' THEN
DBMS_OUTPUT.PUT_LINE('Processing party ' || v_party_key || '...');
BEGIN
v_sql := 'SELECT 1 FROM TBL';
EXECUTE IMMEDIATE v_sql;
UPDATE prc_party_status SET prc_status='SUCCESS' WHERE prc_id = i;
COMMIT;
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
DBMS_OUTPUT.PUT_LINE (SQLERRM);
v_sql_error := SQLERRM;
UPDATE prc_party_status SET prc_status='FAILED' WHERE prc_id = i;
INSERT INTO prc_run_log_details VALUES (v_prc_run_log_id,v_run_config_id,SYSTIMESTAMP,v_sql,v_sql_error);
COMMIT;
END;
ELSE
DBMS_OUTPUT.PUT_LINE('Skipping party ' || v_party_key || '...');
END IF;
END LOOP;
UPDATE prc_run_log_summary SET prc_end_time=SYSTIMESTAMP, prc_status='COMPLETED', prc_msg='Processing completed' WHERE prc_run_log_id=v_prc_run_log_id;
COMMIT;
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
DBMS_OUTPUT.PUT_LINE (SQLERRM);
v_sql_error := SQLERRM;
UPDATE prc_run_log_summary SET prc_end_time=SYSTIMESTAMP, prc_status='FAILED', prc_msg=v_sql_error WHERE prc_run_log_id=v_prc_run_log_id;
COMMIT;
END;
/

truncate table prc_run_log_summary
truncate table prc_run_log_details
select * from prc_run_log_summary
select * from prc_run_log_details
select * from prc_party_status
select * from prc_party_status

update prc_party_status set prc_status='PENDING'
0 new messages