Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

10g: parallel pipelined table functions with cursor selecting from table(cast(SQL collection)) doesn't work

347 views
Skip to first unread message

Frank Bergemann

unread,
Jun 8, 2011, 9:00:57 AM6/8/11
to
i try to distribute SQL data objects - stored in a TABLE OF <SQL
object-Type> - to MULTIPLE (parallel) instances of a table function,
by passing a select CURSOR(...) to the table function, which selects
from the SQL TABLE OF storage via "select * from
TABLE(CAST(<storage> as <storage-type>)".

But oracle always only starts a single table function instance :-( -
whatever hints i provide or setting i use for the parallel table
function (parallel_enable ...)

Could it be, that this is due to the fact, that my data are not
globally available, but only in the main thread data?
Can someone confirm, that it's NOT possible to start multiple parallel
table functions for selecting on SQL data type TABLE OF <object>
storages?

(It will take some time to create an example for that - but might be
necessary to post such here. Let me try without first).

- thanks!

rgds,
Frank

Michel Cadot

unread,
Jun 8, 2011, 11:20:21 AM6/8/11
to

"Frank Bergemann" <FBerg...@web.de> a écrit dans le message de news:
25996c2b-6765-4781...@k16g2000yqm.googlegroups.com...

Waiting for your example/test case...

Regards
Michel


Frank Bergemann

unread,
Jun 8, 2011, 2:52:42 PM6/8/11
to
On 8 Jun., 17:20, "Michel Cadot" <micadot{at}altern{dot}org> wrote:
> "Frank Bergemann" <FBergem...@web.de> a écrit dans le message de news:
> 25996c2b-6765-4781-ac87-f581ba705...@k16g2000yqm.googlegroups.com...

Here it is:

===============
sqlplus test program:
===============

-------------------------------------------- snip
--------------------------------------------------------
set serveroutput on;

drop table test_table;
/

drop type ton_t;
/

drop type test_list;
/

drop type test_obj;
/

create table test_table
(
a number(19,0),
b timestamp with time zone,
c varchar2(256)
);
/

create or replace type test_obj as object(
a number(19,0),
b timestamp with time zone,
c varchar2(256)
);
/

create or replace type test_list as table of test_obj;
/

create or replace type ton_t as table of number;
/

create or replace package test_pkg
as
limit_c constant integer := 4;

type test_rec is record (
a number(19,0),
b timestamp with time zone,
c varchar2(256)
);
type test_tab is table of test_rec;

type test_cur is ref cursor return test_rec;

procedure LogByList(list in test_list, no in integer default -1);

procedure LogByTab(tab in test_tab, no in integer default -1);

procedure LogByCursor(mycur test_cur, no in integer default -1);

function TF(mycur test_cur)
return test_list pipelined
parallel_enable(partition mycur by hash(a));
end;
/

create or replace package body test_pkg
as
procedure LogByList(list in test_list, no in integer default -1)
is
lo integer := list.first;
hi integer := case no when -1 then list.last else no end;
begin
dbms_output.put_line('LogByList(): enter');
dbms_output.put_line('list.count = ' || list.count);

if list.count <= 0 then
dbms_output.put_line('LogByList(): ''list'' is empty!');
else
for i in lo..hi loop
dbms_output.put_line('a = ' || list(i).a || ', b = ' || list(i).b
|| ', c = ' || list(i).c);
end loop;
end if;
dbms_output.put_line('LogByList(): exit');
end;

procedure LogByTab(tab in test_tab, no in integer default -1)
is
lo integer := tab.first;
hi integer := case no when -1 then tab.last else no end;
begin
dbms_output.put_line('LogByTab(): enter');
dbms_output.put_line('tab.count = ' || tab.count);

if tab.count <= 0 then
dbms_output.put_line('LogByTab(): ''tab'' is empty!');
else
for i in lo..hi loop
dbms_output.put_line('a = ' || tab(i).a || ', b = ' || tab(i).b ||
', c = ' || tab(i).c);
end loop;
end if;
dbms_output.put_line('LogByTab(): exit');
end;

procedure LogByCursor(mycur test_cur, no in integer default -1)
is
mytab test_tab;
counter integer := 0;
begin
dbms_output.put_line('LogByCursor( mycur, no => ''' || no || '''):
enter');
loop
fetch mycur bulk collect into mytab limit limit_c;
exit when mytab.count <= 0;
dbms_output.put_line('logByCursor(): fetched data');
for i in mytab.first..mytab.last loop
dbms_output.put_line('a = ' || mytab(i).a || ', b = ' ||
mytab(i).b || ', c = ' || mytab(i).c);
counter := counter + 1;
if no != -1 and counter >= no then
return;
end if;
end loop;
end loop;
dbms_output.put_line('LogByCursor(): exit');
end;

function TF(mycur test_cur)
return test_list pipelined
parallel_enable(partition mycur by hash(a))
is
sid number;
counter number(19,0) := 0;
myrec test_rec;
mytab test_tab;
mytab2 test_list := test_list();
begin
select userenv('SID') into sid from dual;
dbms_output.put_line('test_pkg.TF( sid => '''|| sid || ''' ):
enter');
-- loop
-- fetch mycur bulk collect into mytab;
-- exit when mytab.count = 0;
-- for i in mytab.first..mytab.last loop
-- pipe row(test_obj(mytab(i).a, mytab(i).b, mytab(i).c));
-- end loop;
-- end loop;

loop
fetch mycur into myRec;
exit when mycur%NOTFOUND;
mytab2.extend;
mytab2(mytab2.last) := test_obj(myRec.a, myRec.b, myRec.c);

end loop;
for i in mytab2.first..mytab2.last loop
-- attention: saves own SID in test_obj.a for indication to caller
-- how many sids have been involved
pipe row(test_obj(sid, mytab2(i).b, mytab2(i).c));
counter := counter + 1;
end loop;
dbms_output.put_line('test_pkg.TF(...): exit, piped #' || counter ||
' records');
end;
end;
/

declare

type ton is table of number;

myList test_list := test_list();

myCurWeak sys_refcursor;
myCurStrong test_pkg.test_cur;

myList2 test_list := test_list();

sids ton_t := ton_t();
begin
for i in 1..10000 loop
myList.extend; myList(myList.last) := test_obj(i, sysdate, to_char(i
+2));
end loop;

-- save into the real table
insert into test_table select * from table(cast (myList as
test_list));

test_pkg.LogByList(myList,5);
dbms_output.put_line('1. LogByCursor with weak ''sys_refcursor'':');

dbms_output.put_line('1.1. LogByCursor: no dynamic SQL:');
open myCurWeak for select * from table(cast (myList as test_list));
test_pkg.LogByCursor(myCurWeak,5);
close myCurWeak;

dbms_output.put_line('1.2. LogByCursor: dynamic SQL with
''using'':');
open myCurWeak for 'select * from table(cast (:1 as test_list))'
using myList;
test_pkg.LogByCursor(myCurWeak,5);
close myCurWeak;

dbms_output.put_line('2. LogByCursor with strong
''test_pkg.test_cur'':');

dbms_output.put_line('2.1. LogByCursor without ''using'':');
open myCurStrong for select * from table(cast (myList as test_list));
test_pkg.LogByCursor(myCurStrong,5);
close myCurStrong;

--
-- 2.2. is not possible, causes
-- PLS-00455: cursor 'MYCURSTRONG' cannot be used in dynamic SQL OPEN
statement
-- so disabled:

-- dbms_output.put_line('2.2. LogByCursor: dynamic SQL with
''using'':');
-- open myCurStrong for 'select * from table(cast (:1 as test_list))'
using myList;
-- test_pkg.LogByCursor(myCurStrong);
-- close myCurStrong;


dbms_output.put_line('copy ''mylist'' to ''mylist2'' by streaming via
table function...');
select test_obj(a, b, c) bulk collect into myList2
from table(test_pkg.TF(CURSOR(select /*+ parallel(tab,5) */ * from
table(cast (myList as test_list)) tab)));
dbms_output.put_line('... saved #' || myList2.count || ' records');
select distinct(tab.a) bulk collect into sids from table(cast
(myList2 as test_list)) tab;
for i in sids.first..sids.last loop
dbms_output.put_line('sid #' || sids(i));
end loop;

dbms_output.put_line('copy physical ''test_table'' to ''mylist2'' by
streaming via table function:');
select test_obj(a, b, c) bulk collect into myList2
from table(test_pkg.TF(CURSOR(select /*+ parallel(tab,10) */ * from
test_table tab)));
dbms_output.put_line('... saved #' || myList2.count || ' records');
select distinct(tab.a) bulk collect into sids from table(cast
(myList2 as test_list)) tab;
for i in sids.first..sids.last loop
dbms_output.put_line('sid #' || sids(i));
end loop;

end;
/
-------------------------------------------- snap
--------------------------------------------------------

==========================================================
output (relevant is the trailing part showing, if multiple threads
have been started):
==========================================================

-------------------------------------------- snip
--------------------------------------------------------
[...]
LogByList(): enter
list.count = 10000
a = 1, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 3
a = 2, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 4
a = 3, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 5
a = 4, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 6
a = 5, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 7
LogByList(): exit
1. LogByCursor with weak 'sys_refcursor':
1.1. LogByCursor: no dynamic SQL:
LogByCursor( mycur, no => '5'): enter
logByCursor(): fetched data
a = 1, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 3
a = 2, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 4
a = 3, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 5
a = 4, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 6
logByCursor(): fetched data
a = 5, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 7
1.2. LogByCursor: dynamic SQL with 'using':
LogByCursor( mycur, no => '5'): enter
logByCursor(): fetched data
a = 1, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 3
a = 2, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 4
a = 3, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 5
a = 4, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 6
logByCursor(): fetched data
a = 5, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 7
2. LogByCursor with strong 'test_pkg.test_cur':
2.1. LogByCursor without 'using':
LogByCursor( mycur, no => '5'): enter
logByCursor(): fetched data
a = 1, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 3
a = 2, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 4
a = 3, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 5
a = 4, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 6
logByCursor(): fetched data
a = 5, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 7
copy 'mylist' to 'mylist2' by streaming via table function...
test_pkg.TF( sid => '77' ): enter
test_pkg.TF(...): exit, piped #10000 records
... saved #10000 records
sid #77
copy physical 'test_table' to 'mylist2' by streaming via table
function:
... saved #10000 records
sid #151
sid #138
sid #244
sid #155
sid #217
sid #112
sid #185
sid #248
sid #241
sid #176

PL/SQL procedure successfully completed.

-------------------------------------------- snap
--------------------------------------------------------

I don't get a list of multiple SIDs for the query on the 'myList'

???

best regards,
Frank

Frank Bergemann

unread,
Jun 9, 2011, 4:42:03 AM6/9/11
to
On 8 Jun., 17:20, "Michel Cadot" <micadot{at}altern{dot}org> wrote:
> "Frank Bergemann" <FBergem...@web.de> a écrit dans le message de news:
> 25996c2b-6765-4781-ac87-f581ba705...@k16g2000yqm.googlegroups.com...

example once again - but this time cut back to the essentials:

=====================
sqlplus test program:
=====================

drop table test_table;
/

drop type ton_t;
/

drop type test_list;
/

drop type test_obj;
/

type test_rec is record (
a number(19,0),
b timestamp with time zone,
c varchar2(256)
);
type test_tab is table of test_rec;

type test_cur is ref cursor return test_rec;

function TF(mycur test_cur)


return test_list pipelined
parallel_enable(partition mycur by hash(a));
end;
/

create or replace package body test_pkg
as

dbms_output.put_line('test_pkg.TF( sid => '''|| sid || ''' ): exit,


piped #' || counter || ' records');
end;
end;
/

declare
myList test_list := test_list();


myList2 test_list := test_list();
sids ton_t := ton_t();
begin
for i in 1..10000 loop
myList.extend; myList(myList.last) := test_obj(i, sysdate, to_char(i
+2));
end loop;

-- save into the real table
insert into test_table select * from table(cast (myList as
test_list));

dbms_output.put_line(chr(10) || 'copy ''mylist'' to ''mylist2'' by


streaming via table function...');
select test_obj(a, b, c) bulk collect into myList2

from table(test_pkg.TF(CURSOR(select /*+ parallel(tab,10) */ * from


table(cast (myList as test_list)) tab)));
dbms_output.put_line('... saved #' || myList2.count || ' records');
select distinct(tab.a) bulk collect into sids from table(cast
(myList2 as test_list)) tab;

dbms_output.put_line('worker thread''s sid list:');


for i in sids.first..sids.last loop
dbms_output.put_line('sid #' || sids(i));
end loop;

dbms_output.put_line(chr(10) || 'copy physical ''test_table'' to


''mylist2'' by streaming via table function:');
select test_obj(a, b, c) bulk collect into myList2
from table(test_pkg.TF(CURSOR(select /*+ parallel(tab,10) */ * from
test_table tab)));
dbms_output.put_line('... saved #' || myList2.count || ' records');
select distinct(tab.a) bulk collect into sids from table(cast
(myList2 as test_list)) tab;

dbms_output.put_line('worker thread''s sid list:');


for i in sids.first..sids.last loop
dbms_output.put_line('sid #' || sids(i));
end loop;

end;
/
-------------------------------------------- snap
--------------------------------------------------------

=======
output:
=======

-------------------------------------------- snip
--------------------------------------------------------
[...]


copy 'mylist' to 'mylist2' by streaming via table function...

test_pkg.TF( sid => '80' ): enter
test_pkg.TF( sid => '80' ): exit, piped #10000 records
... saved #10000 records
worker thread's sid list:
sid #80

copy physical 'test_table' to 'mylist2' by streaming via table
function:
... saved #10000 records

worker thread's sid list:
sid #244
sid #77
sid #222
sid #253
sid #95
sid #96
sid #74
sid #136
sid #142
sid #79

Randolf Geist

unread,
Jun 9, 2011, 6:12:12 AM6/9/11
to
On Jun 8, 9:00 am, Frank Bergemann <FBergem...@web.de> wrote:
> i try to distribute SQL data objects - stored in a TABLE OF <SQL
> object-Type> - to MULTIPLE (parallel) instances of a table function,
> by passing a select CURSOR(...) to the table function, which selects
> from the  SQL TABLE OF storage  via "select * from
> TABLE(CAST(<storage> as <storage-type>)".
>
> Could it be, that this is due to the fact, that my data are not
> globally available, but only in the main thread data?
> Can someone confirm, that it's NOT possible to start multiple parallel
> table functions for selecting on SQL data type TABLE OF <object>
> storages?

It is very likely that Oracle requires a "parallelized" source for the
parallel_enabled table function to be executed in parallel, so a
"serial" to "parallel" distribution is probably not supported out of
the box in such a case.

I have kind of a deja-vu - haven't we discussed a similar issue raised
by you not too long ago on this list?

Hope this helps,
Randolf

Oracle related stuff blog:
http://oracle-randolf.blogspot.com/

Co-author of the "OakTable Expert Oracle Practices" book:
http://www.apress.com/book/view/1430226684
http://www.amazon.com/Expert-Oracle-Practices-Database-Administration/dp/1430226684

Frank Bergemann

unread,
Jun 9, 2011, 7:34:30 AM6/9/11
to
> Co-author of the "OakTable Expert Oracle Practices" book:http://www.apress.com/book/view/1430226684http://www.amazon.com/Expert-Oracle-Practices-Database-Administration...

Hi Randolf,

no, this here is a new issue (for me).

I had problems with dynamic SQL cooperating with table functions.
That i solved with dynamically creating static SQL objects (with code)
and delegate to those.
See this: "dynamically created cursor doesn't work for parallel
pipelined functions"
(http://groups.google.com/group/comp.databases.oracle.server/
browse_frm/thread/c139fe298bcb2c4f/734468085369e6af?
hl=de&lnk=gst&q=dynamically+created+cursor#734468085369e6af)

And we both had another discussion about "10g: chaining pipelined
functions is slow "
(http://groups.google.com/group/comp.databases.oracle.server/
browse_frm/thread/4f668e23ba263e21/7a09ea4fa4c986cb?
hl=de&lnk=gst&q=bergemann#7a09ea4fa4c986cb).
For that you pointed me to AQ.
However i need to check for this one again - if the slow execution,
which i recognized, wasn't due to missing /*+ first_rows */ hint
respectively avoiding things like 'order by' in the base select and
'cluster' for the table function.

This entry here is for setting up parallel pipelined table function
for a SQL select done on a SQL collection data type (via
TABLE(CAST(... as ...))).

best regards,
Frank

Frank Bergemann

unread,
Jun 9, 2011, 8:00:04 AM6/9/11
to
On 9 Jun., 12:12, Randolf Geist <mah...@web.de> wrote:
> Co-author of the "OakTable Expert Oracle Practices" book:http://www.apress.com/book/view/1430226684http://www.amazon.com/Expert-Oracle-Practices-Database-Administration...

Pls. see my 'sqlplus' example for entry #4 of this thread.
The problem actually results from some another issues of table
functions:

My application has a #2 steps/stages data selection.
A 1st select for minimal context base data - mainly to evaluate for
DUE driving data records.
And a 2nd select for all the "real" data to process a context (joining
much more other tables here, which i don't want to do for non-due
records).
So it's doing stage #1 select first, then stage #2 select - based on
stage #1 results - next.

The first implementation of the application did the stage #1 select in
the main session of the pl/sql code.
And for the stage #2 select there was done a dispatch to multiple
parallel table functions (in multiple worker sessions) for the "real
work".
That worked.

However there was a flaw:
Between records from stage #1 selection and records from stage #2
selection there is a 1:n relation (via key / foreign key relation).
Means, for #1 resulting record from stage #1 selection, there are #x
records from stage #2 selection.
That forced me to use "cluster curStage2 by (theKey)".
Because the worker sessions need to evaluate the all-over status for a
context of #1 record from stage #1 and #x records from stage #2 (so it
needs to have #x records of stage #2 together).
This then resulted in delay for starting up the worker sessions (i
didn't find a way to get rid of this).

So i wanted to shift the invocation of the worker sessions to the
stage #1 selection.
Then i don't need the "cluster curStage2 by (theKey)" anymore!
But: i also need to do an update of the primary driving data!
So the stage #1 select is a 'select ... for update ...'.
But you can't use such in CURSOR for table functions (which i can
understand, why it's not possible).
So i have to do my stage #1 selection in two steps:

1. 'select for update' by main session and collect result in SQL
collection.
2. pass collected data to parallel table functions

And for 2. i recognized, that it doesn't start up multiple parallel
table function instances.

As a work-around - if it's just not possible to start multiple
parallel pipelined table functions for dispatching from 'select * from
TABLE(CAST(... as ...))' - i need to select again on the base tables -
driven by the SQL collection data.
But before i do so, i wanted to verify, if it's really not possible.
Maybe i just miss a special oracle hint or whatever you can get "out
of another box" :-)

regards,
Frank

Randolf Geist

unread,
Jun 10, 2011, 3:42:14 PM6/10/11
to
On Jun 9, 2:00 pm, Frank Bergemann <FBergem...@web.de> wrote:
> Pls. see my 'sqlplus' example for entry #4 of this thread.

I saw your example and what I've replied still stands: You probably
won't be able to parallelize your table function when using a table
function as input that uses your in-memory collection as the source
can not be executed in parallel, and Oracle obviously does not support
a serial to parallel operation in that sense that there is no parallel
row source that can execute your table function in parallel.

> This then resulted in delay for starting up the worker sessions (i
> didn't find a way to get rid of this).

There is no magic involved - Oracle has to "cluster" (or partition/
sort) the input row source for the table function according to your
function declaration, so on a larger data set this operation may take
some time. I assume that this delay is significant compared to the
overall elapsed time?

> But: i also need to do an update of the primary driving data!
> So the stage #1 select is a 'select ... for update ...'.
> But you can't use such in CURSOR for table functions (which i can
> understand, why it's not possible).
> So i have to do my stage #1 selection in two steps:

I can't remember if we have discussed the details of what you're
actually trying to achieve, but I can't help the feeling that you seem
to try to re-invent the wheel at least partially. Wouldn't it be
possible to implement your processing using pure SQL? Using (possibly
multiple) parallel direct-path inserts and exchange partition
techniques (if you have enterprise edition + partitioning available)
should allow you to perform complex transformations while making the
most of your available hardware. Always remember that "the fastest
update/delete operation is an insert" in many cases where large data
sets need to be processed/manipulated.

Besides that I believe that the workaround that you've mentioned
should allow you to run your second step in parallel, however it
depends of course on the overhead that you introduce by combining the
data from step one with step two how efficient this will be.

Hope this helps,
Randolf

0 new messages