Re: parallel pipelined table functions with cursor selecting from table(cast(SQL collection)) doesn't work
From: Frank Bergemann <FBergemann_at_web.de>
Date: Wed, 8 Jun 2011 11:52:42 -0700 (PDT)
Message-ID: <cce87615-a9aa-45ab-ab64-e1682ff8a2c1_at_k6g2000yqc.googlegroups.com>
On 8 Jun., 17:20, "Michel Cadot" <micadot{at}altern{dot}org> wrote:
> "Frank Bergemann" <FBergem..._at_web.de> a écrit dans le message de news:
> 25996c2b-6765-4781-ac87-f581ba705..._at_k16g2000yqm.googlegroups.com...
> |i try to distribute SQL data objects - stored in a TABLE OF <SQL
> | object-Type> - to MULTIPLE (parallel) instances of a table function,
> | by passing a select CURSOR(...) to the table function, which selects
> | from the SQL TABLE OF storage via "select * from
> | TABLE(CAST(<storage> as <storage-type>)".
> |
> | But oracle always only starts a single table function instance :-( -
> | whatever hints i provide or setting i use for the parallel table
> | function (parallel_enable ...)
> |
> | Could it be, that this is due to the fact, that my data are not
> | globally available, but only in the main thread data?
> | Can someone confirm, that it's NOT possible to start multiple parallel
> | table functions for selecting on SQL data type TABLE OF <object>
> | storages?
> |
> | (It will take some time to create an example for that - but might be
> | necessary to post such here. Let me try without first).
> |
> | - thanks!
> |
> | rgds,
> | Frank
>
> Waiting for your example/test case...
>
> Regards
> Michel
sqlplus test program:
);
/
);
/
/
end;
/
Date: Wed, 8 Jun 2011 11:52:42 -0700 (PDT)
Message-ID: <cce87615-a9aa-45ab-ab64-e1682ff8a2c1_at_k6g2000yqc.googlegroups.com>
On 8 Jun., 17:20, "Michel Cadot" <micadot{at}altern{dot}org> wrote:
> "Frank Bergemann" <FBergem..._at_web.de> a écrit dans le message de news:
> 25996c2b-6765-4781-ac87-f581ba705..._at_k16g2000yqm.googlegroups.com...
> |i try to distribute SQL data objects - stored in a TABLE OF <SQL
> | object-Type> - to MULTIPLE (parallel) instances of a table function,
> | by passing a select CURSOR(...) to the table function, which selects
> | from the SQL TABLE OF storage via "select * from
> | TABLE(CAST(<storage> as <storage-type>)".
> |
> | But oracle always only starts a single table function instance :-( -
> | whatever hints i provide or setting i use for the parallel table
> | function (parallel_enable ...)
> |
> | Could it be, that this is due to the fact, that my data are not
> | globally available, but only in the main thread data?
> | Can someone confirm, that it's NOT possible to start multiple parallel
> | table functions for selecting on SQL data type TABLE OF <object>
> | storages?
> |
> | (It will take some time to create an example for that - but might be
> | necessary to post such here. Let me try without first).
> |
> | - thanks!
> |
> | rgds,
> | Frank
>
> Waiting for your example/test case...
>
> Regards
> Michel
Here it is:
sqlplus test program:
- snip
set serveroutput on;
drop table test_table;
/
drop type ton_t;
/
drop type test_list;
/
drop type test_obj;
/
create table test_table
(
a number(19,0), b timestamp with time zone, c varchar2(256)
);
/
create or replace type test_obj as object(
a number(19,0), b timestamp with time zone, c varchar2(256)
);
/
create or replace type test_list as table of test_obj; /
create or replace type ton_t as table of number; /
create or replace package test_pkg
as
limit_c constant integer := 4;
type test_rec is record ( a number(19,0), b timestamp with time zone, c varchar2(256) ); type test_tab is table of test_rec; type test_cur is ref cursor return test_rec; procedure LogByList(list in test_list, no in integer default -1); procedure LogByTab(tab in test_tab, no in integer default -1); procedure LogByCursor(mycur test_cur, no in integer default -1); function TF(mycur test_cur) return test_list pipelined parallel_enable(partition mycur by hash(a));end;
/
create or replace package body test_pkg
as
procedure LogByList(list in test_list, no in integer default -1) is lo integer := list.first; hi integer := case no when -1 then list.last else no end; begin dbms_output.put_line('LogByList(): enter'); dbms_output.put_line('list.count = ' || list.count); if list.count <= 0 then dbms_output.put_line('LogByList(): ''list'' is empty!'); else for i in lo..hi loop dbms_output.put_line('a = ' || list(i).a || ', b = ' || list(i).b || ', c = ' || list(i).c); end loop; end if; dbms_output.put_line('LogByList(): exit'); end; procedure LogByTab(tab in test_tab, no in integer default -1) is lo integer := tab.first; hi integer := case no when -1 then tab.last else no end; begin dbms_output.put_line('LogByTab(): enter'); dbms_output.put_line('tab.count = ' || tab.count); if tab.count <= 0 then dbms_output.put_line('LogByTab(): ''tab'' is empty!'); else for i in lo..hi loop dbms_output.put_line('a = ' || tab(i).a || ', b = ' || tab(i).b || ', c = ' || tab(i).c); end loop; end if; dbms_output.put_line('LogByTab(): exit'); end; procedure LogByCursor(mycur test_cur, no in integer default -1) is mytab test_tab; counter integer := 0; begin dbms_output.put_line('LogByCursor( mycur, no => ''' || no || '''): enter'); loop fetch mycur bulk collect into mytab limit limit_c; exit when mytab.count <= 0; dbms_output.put_line('logByCursor(): fetched data'); for i in mytab.first..mytab.last loop dbms_output.put_line('a = ' || mytab(i).a || ', b = ' || mytab(i).b || ', c = ' || mytab(i).c); counter := counter + 1; if no != -1 and counter >= no then return; end if; end loop; end loop; dbms_output.put_line('LogByCursor(): exit'); end; function TF(mycur test_cur) return test_list pipelined parallel_enable(partition mycur by hash(a)) is sid number; counter number(19,0) := 0; myrec test_rec; mytab test_tab; mytab2 test_list := test_list(); begin select userenv('SID') into sid from dual; dbms_output.put_line('test_pkg.TF( sid => '''|| sid || ''' ): enter'); -- loop -- fetch mycur bulk collect into mytab; -- exit when mytab.count = 0; -- for i in mytab.first..mytab.last loop -- pipe row(test_obj(mytab(i).a, mytab(i).b, mytab(i).c)); -- end loop; -- end loop; loop fetch mycur into myRec; exit when mycur%NOTFOUND; mytab2.extend; mytab2(mytab2.last) := test_obj(myRec.a, myRec.b, myRec.c); end loop; for i in mytab2.first..mytab2.last loop -- attention: saves own SID in test_obj.a for indication to caller -- how many sids have been involved pipe row(test_obj(sid, mytab2(i).b, mytab2(i).c)); counter := counter + 1; end loop; dbms_output.put_line('test_pkg.TF(...): exit, piped #' || counter || ' records'); end;
end;
/
declare
type ton is table of number;
myList test_list := test_list();
myCurWeak sys_refcursor; myCurStrong test_pkg.test_cur; myList2 test_list := test_list(); sids ton_t := ton_t(); begin for i in 1..10000 loop myList.extend; myList(myList.last) := test_obj(i, sysdate, to_char(i +2)); end loop; -- save into the real table insert into test_table select * from table(cast (myList astest_list));
test_pkg.LogByList(myList,5); dbms_output.put_line('1. LogByCursor with weak ''sys_refcursor'':'); dbms_output.put_line('1.1. LogByCursor: no dynamic SQL:'); open myCurWeak for select * from table(cast (myList as test_list)); test_pkg.LogByCursor(myCurWeak,5); close myCurWeak; dbms_output.put_line('1.2. LogByCursor: dynamic SQL with ''using'':'); open myCurWeak for 'select * from table(cast (:1 as test_list))' using myList; test_pkg.LogByCursor(myCurWeak,5); close myCurWeak; dbms_output.put_line('2. LogByCursor with strong''test_pkg.test_cur'':');
dbms_output.put_line('2.1. LogByCursor without ''using'':'); open myCurStrong for select * from table(cast (myList as test_list)); test_pkg.LogByCursor(myCurStrong,5); close myCurStrong; -- -- 2.2. is not possible, causes -- PLS-00455: cursor 'MYCURSTRONG' cannot be used in dynamic SQL OPEN statement -- so disabled: -- dbms_output.put_line('2.2. LogByCursor: dynamic SQL with ''using'':'); -- open myCurStrong for 'select * from table(cast (:1 as test_list))' using myList; -- test_pkg.LogByCursor(myCurStrong); -- close myCurStrong; dbms_output.put_line('copy ''mylist'' to ''mylist2'' by streaming via table function...'); select test_obj(a, b, c) bulk collect into myList2 from table(test_pkg.TF(CURSOR(select /*+ parallel(tab,5) */ * from table(cast (myList as test_list)) tab))); dbms_output.put_line('... saved #' || myList2.count || ' records'); select distinct(tab.a) bulk collect into sids from table(cast (myList2 as test_list)) tab; for i in sids.first..sids.last loop dbms_output.put_line('sid #' || sids(i)); end loop; dbms_output.put_line('copy physical ''test_table'' to ''mylist2'' by streaming via table function:'); select test_obj(a, b, c) bulk collect into myList2 from table(test_pkg.TF(CURSOR(select /*+ parallel(tab,10) */ * from test_table tab))); dbms_output.put_line('... saved #' || myList2.count || ' records'); select distinct(tab.a) bulk collect into sids from table(cast (myList2 as test_list)) tab; for i in sids.first..sids.last loop dbms_output.put_line('sid #' || sids(i)); end loop;
end;
/
-------------------------------------------- snap -------------------------------------------------------- ==========================================================output (relevant is the trailing part showing, if multiple threads have been started):
- snip
[...] LogByList(): enter list.count = 10000 a = 1, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 3 a = 2, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 4 a = 3, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 5 a = 4, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 6 a = 5, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 7 LogByList(): exit 1. LogByCursor with weak 'sys_refcursor': 1.1. LogByCursor: no dynamic SQL: LogByCursor( mycur, no => '5'): enter logByCursor(): fetched data a = 1, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 3 a = 2, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 4 a = 3, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 5 a = 4, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 6 logByCursor(): fetched data a = 5, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 7 1.2. LogByCursor: dynamic SQL with 'using': LogByCursor( mycur, no => '5'): enter logByCursor(): fetched data a = 1, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 3 a = 2, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 4 a = 3, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 5 a = 4, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 6 logByCursor(): fetched data a = 5, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 7 2. LogByCursor with strong 'test_pkg.test_cur': 2.1. LogByCursor without 'using': LogByCursor( mycur, no => '5'): enter logByCursor(): fetched data a = 1, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 3 a = 2, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 4 a = 3, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 5 a = 4, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 6 logByCursor(): fetched data a = 5, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 7 copy 'mylist' to 'mylist2' by streaming via table function... test_pkg.TF( sid => '77' ): enter test_pkg.TF(...): exit, piped #10000 records ... saved #10000 records sid #77 copy physical 'test_table' to 'mylist2' by streaming via table function: ... saved #10000 records sid #151 sid #138 sid #244 sid #155 sid #217 sid #112 sid #185 sid #248 sid #241 sid #176
PL/SQL procedure successfully completed.
- snap
I don't get a list of multiple SIDs for the query on the 'myList'
???
best regards,
Frank
Received on Wed Jun 08 2011 - 13:52:42 CDT