Re: parallel pipelined table functions with cursor selecting from table(cast(SQL collection)) doesn't work
From: Frank Bergemann <FBergemann_at_web.de>
Date: Wed, 8 Jun 2011 11:52:42 -0700 (PDT)
Message-ID: <cce87615-a9aa-45ab-ab64-e1682ff8a2c1_at_k6g2000yqc.googlegroups.com>
On 8 Jun., 17:20, "Michel Cadot" <micadot{at}altern{dot}org> wrote:
> "Frank Bergemann" <FBergem..._at_web.de> a écrit dans le message de news:
> 25996c2b-6765-4781-ac87-f581ba705..._at_k16g2000yqm.googlegroups.com...
> |i try to distribute SQL data objects - stored in a TABLE OF <SQL
> | object-Type> - to MULTIPLE (parallel) instances of a table function,
> | by passing a select CURSOR(...) to the table function, which selects
> | from the SQL TABLE OF storage via "select * from
> | TABLE(CAST(<storage> as <storage-type>)".
> |
> | But oracle always only starts a single table function instance :-( -
> | whatever hints i provide or setting i use for the parallel table
> | function (parallel_enable ...)
> |
> | Could it be, that this is due to the fact, that my data are not
> | globally available, but only in the main thread data?
> | Can someone confirm, that it's NOT possible to start multiple parallel
> | table functions for selecting on SQL data type TABLE OF <object>
> | storages?
> |
> | (It will take some time to create an example for that - but might be
> | necessary to post such here. Let me try without first).
> |
> | - thanks!
> |
> | rgds,
> | Frank
>
> Waiting for your example/test case...
>
> Regards
> Michel
sqlplus test program:
);
/
);
/
/
end;
/
Date: Wed, 8 Jun 2011 11:52:42 -0700 (PDT)
Message-ID: <cce87615-a9aa-45ab-ab64-e1682ff8a2c1_at_k6g2000yqc.googlegroups.com>
On 8 Jun., 17:20, "Michel Cadot" <micadot{at}altern{dot}org> wrote:
> "Frank Bergemann" <FBergem..._at_web.de> a écrit dans le message de news:
> 25996c2b-6765-4781-ac87-f581ba705..._at_k16g2000yqm.googlegroups.com...
> |i try to distribute SQL data objects - stored in a TABLE OF <SQL
> | object-Type> - to MULTIPLE (parallel) instances of a table function,
> | by passing a select CURSOR(...) to the table function, which selects
> | from the SQL TABLE OF storage via "select * from
> | TABLE(CAST(<storage> as <storage-type>)".
> |
> | But oracle always only starts a single table function instance :-( -
> | whatever hints i provide or setting i use for the parallel table
> | function (parallel_enable ...)
> |
> | Could it be, that this is due to the fact, that my data are not
> | globally available, but only in the main thread data?
> | Can someone confirm, that it's NOT possible to start multiple parallel
> | table functions for selecting on SQL data type TABLE OF <object>
> | storages?
> |
> | (It will take some time to create an example for that - but might be
> | necessary to post such here. Let me try without first).
> |
> | - thanks!
> |
> | rgds,
> | Frank
>
> Waiting for your example/test case...
>
> Regards
> Michel
Here it is:
sqlplus test program:
- snip
set serveroutput on;
drop table test_table;
/
drop type ton_t;
/
drop type test_list;
/
drop type test_obj;
/
create table test_table
(
a number(19,0), b timestamp with time zone, c varchar2(256)
);
/
create or replace type test_obj as object(
a number(19,0), b timestamp with time zone, c varchar2(256)
);
/
create or replace type test_list as table of test_obj; /
create or replace type ton_t as table of number; /
create or replace package test_pkg
as
limit_c constant integer := 4;
type test_rec is record (
a number(19,0),
b timestamp with time zone,
c varchar2(256)
);
type test_tab is table of test_rec;
type test_cur is ref cursor return test_rec;
procedure LogByList(list in test_list, no in integer default -1);
procedure LogByTab(tab in test_tab, no in integer default -1);
procedure LogByCursor(mycur test_cur, no in integer default -1);
function TF(mycur test_cur)
return test_list pipelined
parallel_enable(partition mycur by hash(a));
end;
/
create or replace package body test_pkg
as
procedure LogByList(list in test_list, no in integer default -1)
is
lo integer := list.first;
hi integer := case no when -1 then list.last else no end;
begin
dbms_output.put_line('LogByList(): enter');
dbms_output.put_line('list.count = ' || list.count);
if list.count <= 0 then
dbms_output.put_line('LogByList(): ''list'' is empty!');
else
for i in lo..hi loop
dbms_output.put_line('a = ' || list(i).a || ', b = ' || list(i).b
|| ', c = ' || list(i).c);
end loop;
end if;
dbms_output.put_line('LogByList(): exit');
end;
procedure LogByTab(tab in test_tab, no in integer default -1)
is
lo integer := tab.first;
hi integer := case no when -1 then tab.last else no end;
begin
dbms_output.put_line('LogByTab(): enter');
dbms_output.put_line('tab.count = ' || tab.count);
if tab.count <= 0 then
dbms_output.put_line('LogByTab(): ''tab'' is empty!');
else
for i in lo..hi loop
dbms_output.put_line('a = ' || tab(i).a || ', b = ' || tab(i).b ||
', c = ' || tab(i).c);
end loop;
end if;
dbms_output.put_line('LogByTab(): exit');
end;
procedure LogByCursor(mycur test_cur, no in integer default -1)
is
mytab test_tab;
counter integer := 0;
begin
dbms_output.put_line('LogByCursor( mycur, no => ''' || no || '''):
enter');
loop
fetch mycur bulk collect into mytab limit limit_c;
exit when mytab.count <= 0;
dbms_output.put_line('logByCursor(): fetched data');
for i in mytab.first..mytab.last loop
dbms_output.put_line('a = ' || mytab(i).a || ', b = ' ||
mytab(i).b || ', c = ' || mytab(i).c);
counter := counter + 1;
if no != -1 and counter >= no then
return;
end if;
end loop;
end loop;
dbms_output.put_line('LogByCursor(): exit');
end;
function TF(mycur test_cur)
return test_list pipelined
parallel_enable(partition mycur by hash(a))
is
sid number;
counter number(19,0) := 0;
myrec test_rec;
mytab test_tab;
mytab2 test_list := test_list();
begin
select userenv('SID') into sid from dual;
dbms_output.put_line('test_pkg.TF( sid => '''|| sid || ''' ):
enter');
-- loop
-- fetch mycur bulk collect into mytab;
-- exit when mytab.count = 0;
-- for i in mytab.first..mytab.last loop
-- pipe row(test_obj(mytab(i).a, mytab(i).b, mytab(i).c));
-- end loop;
-- end loop;
loop
fetch mycur into myRec;
exit when mycur%NOTFOUND;
mytab2.extend;
mytab2(mytab2.last) := test_obj(myRec.a, myRec.b, myRec.c);
end loop;
for i in mytab2.first..mytab2.last loop
-- attention: saves own SID in test_obj.a for indication to caller
-- how many sids have been involved
pipe row(test_obj(sid, mytab2(i).b, mytab2(i).c));
counter := counter + 1;
end loop;
dbms_output.put_line('test_pkg.TF(...): exit, piped #' || counter ||
' records');
end;
end;
/
declare
type ton is table of number;
myList test_list := test_list();
myCurWeak sys_refcursor; myCurStrong test_pkg.test_cur; myList2 test_list := test_list(); sids ton_t := ton_t(); begin for i in 1..10000 loop myList.extend; myList(myList.last) := test_obj(i, sysdate, to_char(i +2)); end loop; -- save into the real table insert into test_table select * from table(cast (myList astest_list));
test_pkg.LogByList(myList,5);
dbms_output.put_line('1. LogByCursor with weak ''sys_refcursor'':');
dbms_output.put_line('1.1. LogByCursor: no dynamic SQL:');
open myCurWeak for select * from table(cast (myList as test_list));
test_pkg.LogByCursor(myCurWeak,5);
close myCurWeak;
dbms_output.put_line('1.2. LogByCursor: dynamic SQL with
''using'':');
open myCurWeak for 'select * from table(cast (:1 as test_list))'
using myList;
test_pkg.LogByCursor(myCurWeak,5);
close myCurWeak;
dbms_output.put_line('2. LogByCursor with strong
''test_pkg.test_cur'':');
dbms_output.put_line('2.1. LogByCursor without ''using'':');
open myCurStrong for select * from table(cast (myList as test_list));
test_pkg.LogByCursor(myCurStrong,5);
close myCurStrong;
--
-- 2.2. is not possible, causes
-- PLS-00455: cursor 'MYCURSTRONG' cannot be used in dynamic SQL OPEN
statement
-- so disabled:
-- dbms_output.put_line('2.2. LogByCursor: dynamic SQL with
''using'':');
-- open myCurStrong for 'select * from table(cast (:1 as test_list))'
using myList;
-- test_pkg.LogByCursor(myCurStrong);
-- close myCurStrong;
dbms_output.put_line('copy ''mylist'' to ''mylist2'' by streaming via
table function...');
select test_obj(a, b, c) bulk collect into myList2
from table(test_pkg.TF(CURSOR(select /*+ parallel(tab,5) */ * from
table(cast (myList as test_list)) tab)));
dbms_output.put_line('... saved #' || myList2.count || ' records');
select distinct(tab.a) bulk collect into sids from table(cast
(myList2 as test_list)) tab;
for i in sids.first..sids.last loop
dbms_output.put_line('sid #' || sids(i));
end loop;
dbms_output.put_line('copy physical ''test_table'' to ''mylist2'' by
streaming via table function:');
select test_obj(a, b, c) bulk collect into myList2
from table(test_pkg.TF(CURSOR(select /*+ parallel(tab,10) */ * from
test_table tab)));
dbms_output.put_line('... saved #' || myList2.count || ' records');
select distinct(tab.a) bulk collect into sids from table(cast
(myList2 as test_list)) tab;
for i in sids.first..sids.last loop
dbms_output.put_line('sid #' || sids(i));
end loop;
end;
/
-------------------------------------------- snap -------------------------------------------------------- ==========================================================output (relevant is the trailing part showing, if multiple threads have been started):
- snip
[...] LogByList(): enter list.count = 10000 a = 1, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 3 a = 2, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 4 a = 3, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 5 a = 4, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 6 a = 5, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 7 LogByList(): exit 1. LogByCursor with weak 'sys_refcursor': 1.1. LogByCursor: no dynamic SQL: LogByCursor( mycur, no => '5'): enter logByCursor(): fetched data a = 1, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 3 a = 2, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 4 a = 3, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 5 a = 4, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 6 logByCursor(): fetched data a = 5, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 7 1.2. LogByCursor: dynamic SQL with 'using': LogByCursor( mycur, no => '5'): enter logByCursor(): fetched data a = 1, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 3 a = 2, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 4 a = 3, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 5 a = 4, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 6 logByCursor(): fetched data a = 5, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 7 2. LogByCursor with strong 'test_pkg.test_cur': 2.1. LogByCursor without 'using': LogByCursor( mycur, no => '5'): enter logByCursor(): fetched data a = 1, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 3 a = 2, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 4 a = 3, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 5 a = 4, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 6 logByCursor(): fetched data a = 5, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 7 copy 'mylist' to 'mylist2' by streaming via table function... test_pkg.TF( sid => '77' ): enter test_pkg.TF(...): exit, piped #10000 records ... saved #10000 records sid #77 copy physical 'test_table' to 'mylist2' by streaming via table function: ... saved #10000 records sid #151 sid #138 sid #244 sid #155 sid #217 sid #112 sid #185 sid #248 sid #241 sid #176
PL/SQL procedure successfully completed.
- snap
I don't get a list of multiple SIDs for the query on the 'myList'
???
best regards,
Frank
Received on Wed Jun 08 2011 - 13:52:42 CDT
