Re: parallel pipelined table functions with cursor selecting from table(cast(SQL collection)) doesn't work

From: Frank Bergemann <FBergemann_at_web.de>
Date: Wed, 8 Jun 2011 11:52:42 -0700 (PDT)
Message-ID: <cce87615-a9aa-45ab-ab64-e1682ff8a2c1_at_k6g2000yqc.googlegroups.com>



On 8 Jun., 17:20, "Michel Cadot" <micadot{at}altern{dot}org> wrote:
> "Frank Bergemann" <FBergem..._at_web.de> a écrit dans le message de news:
> 25996c2b-6765-4781-ac87-f581ba705..._at_k16g2000yqm.googlegroups.com...
> |i try to distribute SQL data objects - stored in a TABLE OF <SQL
> | object-Type> - to MULTIPLE (parallel) instances of a table function,
> | by passing a select CURSOR(...) to the table function, which selects
> | from the  SQL TABLE OF storage  via "select * from
> | TABLE(CAST(<storage> as <storage-type>)".
> |
> | But oracle always only starts a single table function instance :-( -
> | whatever hints i provide or setting i use for the parallel table
> | function (parallel_enable ...)
> |
> | Could it be, that this is due to the fact, that my data are not
> | globally available, but only in the main thread data?
> | Can someone confirm, that it's NOT possible to start multiple parallel
> | table functions for selecting on SQL data type TABLE OF <object>
> | storages?
> |
> | (It will take some time to create an example for that - but might be
> | necessary to post such here. Let me try without first).
> |
> | - thanks!
> |
> | rgds,
> | Frank
>
> Waiting for your example/test case...
>
> Regards
> Michel

Here it is:



sqlplus test program:
  • snip
    set serveroutput on;

drop table test_table;
/

drop type ton_t;
/

drop type test_list;
/

drop type test_obj;
/

create table test_table
(

	a number(19,0),
	b timestamp with time zone,
	c varchar2(256)

);
/

create or replace type test_obj as object(

	a number(19,0),
	b timestamp with time zone,
	c varchar2(256)

);
/

create or replace type test_list as table of test_obj; /

create or replace type ton_t as table of number; /

create or replace package test_pkg
as

        limit_c constant integer := 4;

	type test_rec is record (
		a number(19,0),
		b timestamp with time zone,
		c varchar2(256)
	);
	type test_tab is table of test_rec;

	type test_cur is ref cursor return test_rec;

	procedure LogByList(list in test_list, no in integer default -1);

	procedure LogByTab(tab in test_tab, no in integer default -1);

	procedure LogByCursor(mycur test_cur, no in integer default -1);

	function TF(mycur test_cur)
        return test_list pipelined
        parallel_enable(partition mycur by hash(a));
end;
/

create or replace package body test_pkg
as

	procedure LogByList(list in test_list, no in integer default -1)
	is
		lo integer := list.first;
		hi integer := case no when -1 then list.last else no end;
	begin
		dbms_output.put_line('LogByList(): enter');
		dbms_output.put_line('list.count = ' || list.count);

		if list.count <= 0 then
			dbms_output.put_line('LogByList(): ''list'' is empty!');
		else
			for i in lo..hi loop
				dbms_output.put_line('a = ' || list(i).a || ', b = ' || list(i).b
|| ', c = ' || list(i).c);
			end loop;
		end if;
		dbms_output.put_line('LogByList(): exit');
	end;

	procedure LogByTab(tab in test_tab, no in integer default -1)
	is
		lo integer := tab.first;
		hi integer := case no when -1 then tab.last else no end;
	begin
		dbms_output.put_line('LogByTab(): enter');
		dbms_output.put_line('tab.count = ' || tab.count);

		if tab.count <= 0 then
			dbms_output.put_line('LogByTab(): ''tab'' is empty!');
		else
			for i in lo..hi loop
				dbms_output.put_line('a = ' || tab(i).a || ', b = ' || tab(i).b ||
', c = ' || tab(i).c);
			end loop;
		end if;
		dbms_output.put_line('LogByTab(): exit');
	end;

	procedure LogByCursor(mycur test_cur, no in integer default -1)
	is
		mytab test_tab;
		counter integer := 0;
	begin
		dbms_output.put_line('LogByCursor( mycur, no => ''' || no || '''):
enter');
		loop
			fetch mycur bulk collect into mytab limit limit_c;
			exit when mytab.count <= 0;
			dbms_output.put_line('logByCursor(): fetched data');
			for i in mytab.first..mytab.last loop
				dbms_output.put_line('a = ' || mytab(i).a || ', b = ' ||
mytab(i).b || ', c = ' || mytab(i).c);
				counter := counter + 1;
				if no != -1 and counter >= no then
					return;
				end if;
			end loop;
		end loop;
		dbms_output.put_line('LogByCursor(): exit');
	end;

	function TF(mycur test_cur)
        return test_list pipelined
        parallel_enable(partition mycur by hash(a))
        is
		sid number;
		counter number(19,0) := 0;
		myrec test_rec;
		mytab test_tab;
		mytab2 test_list := test_list();
	begin
		select userenv('SID') into sid from dual;
		dbms_output.put_line('test_pkg.TF( sid => '''|| sid || ''' ):
enter');
--		loop
--			fetch mycur bulk collect into mytab;
--			exit when mytab.count = 0;
--			for i in mytab.first..mytab.last loop
--				pipe row(test_obj(mytab(i).a, mytab(i).b, mytab(i).c));
--			end loop;
--		end loop;

		loop
			fetch mycur into myRec;
			exit when mycur%NOTFOUND;
			mytab2.extend;
			mytab2(mytab2.last) := test_obj(myRec.a, myRec.b, myRec.c);

		end loop;
		for i in mytab2.first..mytab2.last loop
			-- attention: saves own SID in test_obj.a for indication to caller
			--	how many sids have been involved
			pipe row(test_obj(sid, mytab2(i).b, mytab2(i).c));
			counter := counter + 1;
		end loop;
		dbms_output.put_line('test_pkg.TF(...): exit, piped #' || counter ||
' records');
	end;

end;
/

declare

        type ton is table of number;

        myList test_list := test_list();

	myCurWeak sys_refcursor;
	myCurStrong test_pkg.test_cur;

	myList2 test_list := test_list();

	sids ton_t := ton_t();
begin
	for i in 1..10000 loop
		myList.extend; myList(myList.last) := test_obj(i, sysdate, to_char(i
+2));
	end loop;

	-- save into the real table
	insert into test_table select * from table(cast (myList as
test_list));
	test_pkg.LogByList(myList,5);
	dbms_output.put_line('1. LogByCursor with weak ''sys_refcursor'':');

	dbms_output.put_line('1.1. LogByCursor: no dynamic SQL:');
	open myCurWeak for select * from table(cast (myList as test_list));
	test_pkg.LogByCursor(myCurWeak,5);
	close myCurWeak;

	dbms_output.put_line('1.2. LogByCursor: dynamic SQL with
''using'':');
	open myCurWeak for 'select * from table(cast (:1 as test_list))'
using myList;
	test_pkg.LogByCursor(myCurWeak,5);
	close myCurWeak;

	dbms_output.put_line('2. LogByCursor with strong
''test_pkg.test_cur'':');
	dbms_output.put_line('2.1. LogByCursor without ''using'':');
	open myCurStrong for select * from table(cast (myList as test_list));
	test_pkg.LogByCursor(myCurStrong,5);
	close myCurStrong;

--
--	2.2. is not possible, causes
--	PLS-00455: cursor 'MYCURSTRONG' cannot be used in dynamic SQL OPEN
statement
--	so disabled:

--	dbms_output.put_line('2.2. LogByCursor: dynamic SQL with
''using'':');
--	open myCurStrong for 'select * from table(cast (:1 as test_list))'
using myList;
--	test_pkg.LogByCursor(myCurStrong);
--	close myCurStrong;


	dbms_output.put_line('copy ''mylist'' to ''mylist2'' by streaming via
table function...');
	select test_obj(a, b, c) bulk collect into myList2
	from table(test_pkg.TF(CURSOR(select /*+ parallel(tab,5) */ * from
table(cast (myList as test_list)) tab)));
	dbms_output.put_line('... saved #' || myList2.count || ' records');
	select distinct(tab.a) bulk collect into sids from table(cast
(myList2 as test_list)) tab;
	for i in sids.first..sids.last loop
		dbms_output.put_line('sid #' || sids(i));
	end loop;

	dbms_output.put_line('copy physical ''test_table'' to ''mylist2'' by
streaming via table function:');
	select test_obj(a, b, c) bulk collect into myList2
	from table(test_pkg.TF(CURSOR(select /*+ parallel(tab,10) */ * from
test_table tab)));
	dbms_output.put_line('... saved #' || myList2.count || ' records');
	select distinct(tab.a) bulk collect into sids from table(cast
(myList2 as test_list)) tab;
	for i in sids.first..sids.last loop
		dbms_output.put_line('sid #' || sids(i));
	end loop;

end;
/

-------------------------------------------- snap
--------------------------------------------------------

==========================================================
output (relevant is the trailing part showing, if multiple threads have been started):
  • snip
    [...] LogByList(): enter list.count = 10000 a = 1, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 3 a = 2, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 4 a = 3, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 5 a = 4, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 6 a = 5, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 7 LogByList(): exit 1. LogByCursor with weak 'sys_refcursor': 1.1. LogByCursor: no dynamic SQL: LogByCursor( mycur, no => '5'): enter logByCursor(): fetched data a = 1, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 3 a = 2, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 4 a = 3, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 5 a = 4, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 6 logByCursor(): fetched data a = 5, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 7 1.2. LogByCursor: dynamic SQL with 'using': LogByCursor( mycur, no => '5'): enter logByCursor(): fetched data a = 1, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 3 a = 2, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 4 a = 3, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 5 a = 4, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 6 logByCursor(): fetched data a = 5, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 7 2. LogByCursor with strong 'test_pkg.test_cur': 2.1. LogByCursor without 'using': LogByCursor( mycur, no => '5'): enter logByCursor(): fetched data a = 1, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 3 a = 2, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 4 a = 3, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 5 a = 4, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 6 logByCursor(): fetched data a = 5, b = 08-JUN-11 08.48.10.000000 PM EUROPE/BERLIN, c = 7 copy 'mylist' to 'mylist2' by streaming via table function... test_pkg.TF( sid => '77' ): enter test_pkg.TF(...): exit, piped #10000 records ... saved #10000 records sid #77 copy physical 'test_table' to 'mylist2' by streaming via table function: ... saved #10000 records sid #151 sid #138 sid #244 sid #155 sid #217 sid #112 sid #185 sid #248 sid #241 sid #176

PL/SQL procedure successfully completed.

  • snap

I don't get a list of multiple SIDs for the query on the 'myList'

???

best regards,
Frank Received on Wed Jun 08 2011 - 13:52:42 CDT

Original text of this message