Re: parallel pipelined table functions with cursor selecting from table(cast(SQL collection)) doesn't work

From: Frank Bergemann <FBergemann_at_web.de>
Date: Thu, 9 Jun 2011 01:42:03 -0700 (PDT)
Message-ID: <f10615fc-0401-4cca-9057-eeb144671b5a_at_p9g2000prh.googlegroups.com>



On 8 Jun., 17:20, "Michel Cadot" <micadot{at}altern{dot}org> wrote:
> "Frank Bergemann" <FBergem..._at_web.de> a écrit dans le message de news:
> 25996c2b-6765-4781-ac87-f581ba705..._at_k16g2000yqm.googlegroups.com...
> |i try to distribute SQL data objects - stored in a TABLE OF <SQL
> | object-Type> - to MULTIPLE (parallel) instances of a table function,
> | by passing a select CURSOR(...) to the table function, which selects
> | from the  SQL TABLE OF storage  via "select * from
> | TABLE(CAST(<storage> as <storage-type>)".
> |
> | But oracle always only starts a single table function instance :-( -
> | whatever hints i provide or setting i use for the parallel table
> | function (parallel_enable ...)
> |
> | Could it be, that this is due to the fact, that my data are not
> | globally available, but only in the main thread data?
> | Can someone confirm, that it's NOT possible to start multiple parallel
> | table functions for selecting on SQL data type TABLE OF <object>
> | storages?
> |
> | (It will take some time to create an example for that - but might be
> | necessary to post such here. Let me try without first).
> |
> | - thanks!
> |
> | rgds,
> | Frank
>
> Waiting for your example/test case...
>
> Regards
> Michel

example once again - but this time cut back to the essentials:



sqlplus test program:
  • snip
    set serveroutput on;

drop table test_table;
/

drop type ton_t;
/

drop type test_list;
/

drop type test_obj;
/

create table test_table
(

	a number(19,0),
	b timestamp with time zone,
	c varchar2(256)

);
/

create or replace type test_obj as object(

	a number(19,0),
	b timestamp with time zone,
	c varchar2(256)

);
/

create or replace type test_list as table of test_obj; /

create or replace type ton_t as table of number; /

create or replace package test_pkg
as

	type test_rec is record (
		a number(19,0),
		b timestamp with time zone,
		c varchar2(256)
	);
	type test_tab is table of test_rec;

	type test_cur is ref cursor return test_rec;

	function TF(mycur test_cur)
        return test_list pipelined
        parallel_enable(partition mycur by hash(a));
end;
/

create or replace package body test_pkg
as

	function TF(mycur test_cur)
        return test_list pipelined
        parallel_enable(partition mycur by hash(a))
        is
		sid number;
		counter number(19,0) := 0;
		myrec test_rec;
		mytab test_tab;
		mytab2 test_list := test_list();
	begin
		select userenv('SID') into sid from dual;
		dbms_output.put_line('test_pkg.TF( sid => '''|| sid || ''' ):
enter');
--		loop
--			fetch mycur bulk collect into mytab;
--			exit when mytab.count = 0;
--			for i in mytab.first..mytab.last loop
--				pipe row(test_obj(mytab(i).a, mytab(i).b, mytab(i).c));
--			end loop;
--		end loop;

		loop
			fetch mycur into myRec;
			exit when mycur%NOTFOUND;
			mytab2.extend;
			mytab2(mytab2.last) := test_obj(myRec.a, myRec.b, myRec.c);

		end loop;
		for i in mytab2.first..mytab2.last loop
			-- attention: saves own SID in test_obj.a for indication to caller
			--	how many sids have been involved
			pipe row(test_obj(sid, mytab2(i).b, mytab2(i).c));
			counter := counter + 1;
		end loop;
		dbms_output.put_line('test_pkg.TF( sid => '''|| sid || ''' ): exit,
piped #' || counter || ' records');
	end;

end;
/

declare

	myList test_list := test_list();
	myList2 test_list := test_list();
	sids ton_t := ton_t();
begin
	for i in 1..10000 loop
		myList.extend; myList(myList.last) := test_obj(i, sysdate, to_char(i
+2));
	end loop;

	-- save into the real table
	insert into test_table select * from table(cast (myList as
test_list));

        dbms_output.put_line(chr(10) || 'copy ''mylist'' to ''mylist2'' by streaming via table function...');

	select test_obj(a, b, c) bulk collect into myList2
	from table(test_pkg.TF(CURSOR(select /*+ parallel(tab,10) */ * from
table(cast (myList as test_list)) tab)));
	dbms_output.put_line('... saved #' || myList2.count || ' records');
	select distinct(tab.a) bulk collect into sids from table(cast
(myList2 as test_list)) tab;
	dbms_output.put_line('worker thread''s sid list:');
	for i in sids.first..sids.last loop
		dbms_output.put_line('sid #' || sids(i));
	end loop;

	dbms_output.put_line(chr(10) || 'copy physical ''test_table'' to
''mylist2'' by streaming via table function:');
	select test_obj(a, b, c) bulk collect into myList2
	from table(test_pkg.TF(CURSOR(select /*+ parallel(tab,10) */ * from
test_table tab)));
	dbms_output.put_line('... saved #' || myList2.count || ' records');
	select distinct(tab.a) bulk collect into sids from table(cast
(myList2 as test_list)) tab;
	dbms_output.put_line('worker thread''s sid list:');
	for i in sids.first..sids.last loop
		dbms_output.put_line('sid #' || sids(i));
	end loop;

end;
/

-------------------------------------------- snap
--------------------------------------------------------



output:
  • snip
    [...] copy 'mylist' to 'mylist2' by streaming via table function... test_pkg.TF( sid => '80' ): enter test_pkg.TF( sid => '80' ): exit, piped #10000 records ... saved #10000 records worker thread's sid list: sid #80

copy physical 'test_table' to 'mylist2' by streaming via table function:
... saved #10000 records
worker thread's sid list:

sid #244
sid #77
sid #222
sid #253
sid #95
sid #96
sid #74
sid #136
sid #142
sid #79

PL/SQL procedure successfully completed.

-------------------------------------------- snap
--------------------------------------------------------

I don't get a list of multiple SIDs for the query on the 'myList'

???

best regards,
Frank Received on Thu Jun 09 2011 - 03:42:03 CDT

Original text of this message