Re: dynamically created cursor doesn't work for parallel pipelined functions

From: Frank Bergemann <FBergemann_at_web.de>
Date: Sat, 4 Dec 2010 11:12:05 -0800 (PST)
Message-ID: <664bd5d1-efb9-416f-9c50-f0029a414461_at_r29g2000yqj.googlegroups.com>




wow, i am surprised, what's all possible:

drop table parallel_test;

drop type MyDoit;

drop type BaseDoit;

CREATE TABLE parallel_test (
  id NUMBER(10),
  description VARCHAR2(50)
);

BEGIN
  FOR i IN 1 .. 100000 LOOP
    INSERT INTO parallel_test (id, description)     VALUES (i, 'Description or ' || i);
  END LOOP;
  COMMIT;
END;
/

create or replace type BaseDoit as object (

	id number,
	static function make(p_id in number)
	return BaseDoit,
	member procedure doit(
		p_sids in out nocopy ton,
		p_counts in out nocopy ton)

) not final;
/

create or replace type body BaseDoit as

	static function make(p_id in number)
	return BaseDoit
	is
	begin
		return new BaseDoit(p_id);
	end;

    member procedure doit(
		p_sids in out nocopy ton,
		p_counts in out nocopy ton)
	is
	begin
		dbms_output.put_line('BaseDoit.doit(' || id || ') invoked...');
	end;

end;
/
  • Define a strongly typed REF CURSOR type.

CREATE OR REPLACE PACKAGE parallel_ptf_api AS

  TYPE t_parallel_test_row IS RECORD (
    id1 NUMBER(10),
    desc1 VARCHAR2(50),
    id2 NUMBER(10),
    desc2 VARCHAR2(50),
    sid NUMBER
  );

  TYPE t_parallel_test_tab IS TABLE OF t_parallel_test_row;

  TYPE t_parallel_test_ref_cursor IS REF CURSOR RETURN t_parallel_test_row;

  FUNCTION test_ptf (p_cursor IN t_parallel_test_ref_cursor)     RETURN t_parallel_test_tab PIPELINED     PARALLEL_ENABLE(PARTITION p_cursor BY any);

END parallel_ptf_api;
/

SHOW ERRORS CREATE OR REPLACE PACKAGE BODY parallel_ptf_api AS

  FUNCTION test_ptf (p_cursor IN t_parallel_test_ref_cursor)     RETURN t_parallel_test_tab PIPELINED     PARALLEL_ENABLE(PARTITION p_cursor BY any)   IS
    l_row t_parallel_test_row;
  BEGIN
    LOOP

      FETCH p_cursor
      INTO  l_row;
      EXIT WHEN p_cursor%NOTFOUND;

	  select userenv('SID') into l_row.sid from dual;

      PIPE ROW (l_row);

    END LOOP;
    RETURN;
  END test_ptf;

END parallel_ptf_api;
/

SHOW ERRORS PROMPT
PROMPT Serial Execution

PROMPT ================

SELECT sid, count(*)
FROM TABLE(parallel_ptf_api.test_ptf(CURSOR(SELECT t1.id, t1.description, t2.id, t2.description, null
                                              FROM   parallel_test t1,
parallel_test t2
												where t1.id = t2.id
                                             )
                                      )
            ) t2

GROUP BY sid;

PROMPT
PROMPT Parallel Execution

PROMPT ==================

SELECT sid, count(*)
FROM TABLE(parallel_ptf_api.test_ptf(CURSOR(SELECT /*+ parallel(t1,5) */ t1.id, t1.description, t2.id, t2.description, null
                                              FROM   parallel_test t1,
parallel_test t2
												where t1.id = t2.id
                                             )
                                       )
            ) t2

GROUP BY sid;

PROMPT
PROMPT Parallel Execution 2

PROMPT ==================

set serveroutput on;

declare

	v_sids ton := ton();
	v_counts ton := ton();
--	v_cur parallel_ptf_api.t_parallel_test_ref_cursor;
	v_cur sys_refcursor;

	procedure OpenCursor(p_refCursor out sys_refcursor)
	is
	begin
		open p_refCursor  for 'SELECT /*+ parallel(t1,5) */ t1.id,
t1.description, t2.id, t2.description, null
                                              FROM   parallel_test t1,
parallel_test t2
												where t1.id = t2.id';
	end;

begin

        OpenCursor(v_cur);

	SELECT sid, count(*) bulk collect into v_sids, v_counts
	FROM   TABLE(parallel_ptf_api.test_ptf(v_cur)) t2
	GROUP BY sid;

	for i in v_sids.FIRST.. v_sids.LAST loop
		dbms_output.put_line (v_sids(i) || ', ' || v_counts(i));
	end loop;

end;
/

PROMPT
PROMPT Parallel Execution 3

PROMPT ==================

set serveroutput on;

declare

	instance BaseDoit;
	v_sids ton := ton();
	v_counts ton := ton();

	procedure CreateMyDoit
	is
		cmd varchar2(4096 char);
	begin
		cmd := 'create or replace type MyDoit under BaseDoit ( ' ||
					'	static function make(p_id in number) ' ||
					'	return MyDoit, ' ||
					'	overriding member procedure doit( ' ||
					'			p_sids in out nocopy ton,  ' ||
					'			p_counts in out nocopy ton) ' ||
					' )';
		execute immediate cmd;

		cmd := 'create or replace type body MyDoit as ' ||
					'	static function make(p_id in number) ' ||
					'	return MyDoit ' ||
					'	is ' ||
					'	begin ' ||
					'		return new MyDoit(p_id); ' ||
					'	end; ' ||
					' ' ||
					'	overriding member procedure doit( ' ||
					'		p_sids in out nocopy ton, ' ||
					'		p_counts in out nocopy ton) ' ||
					'	is ' ||
					'	begin ' ||
					'		dbms_output.put_line(''MyDoit.doit('' || id || '')
invoked...''); ' ||
					'		SELECT sid, count(*) bulk collect into p_sids, p_counts ' ||
					'		FROM   TABLE(parallel_ptf_api.test_ptf(CURSOR( ' ||
					'							SELECT /*+ parallel(t1,5) */ t1.id, t1.description,
t2.id, t2.description, null ' ||
					'							FROM   parallel_test t1, parallel_test t2 ' ||
					'							where t1.id = t2.id ' ||
					'                      ))) ' ||
					'		GROUP BY sid; ' ||


					'	end; ' ||
					' end; ';
		execute immediate cmd;

	end;

begin

	CreateMyDoit;
	execute immediate 'select MyDoit.Make(11)  from dual' into instance;
	instance.doit(v_sids, v_counts);

	if v_sids.COUNT > 0 then
		for i in v_sids.FIRST.. v_sids.LAST loop
			dbms_output.put_line (v_sids(i) || ', ' || v_counts(i));
		end loop;
	end if;

end;
/



The output is:

SQL> _at_test

Table dropped.

Type dropped.

Type dropped.

Table created.

PL/SQL procedure successfully completed.

Type created.

Type body created.

Package created.

No errors.

Package body created.

No errors.

Serial Execution


       SID COUNT(*)
---------- ----------

       400 100000

Parallel Execution


       SID COUNT(*)
---------- ----------

       429	20130
       458	20035
       547	20040
       594	19886
       390	19909


Parallel Execution 2



400, 100000

PL/SQL procedure successfully completed.

Parallel Execution 3



MyDoit.doit(11) invoked...
429, 19886
390, 20130
368, 20035
519, 19909
458, 20040

PL/SQL procedure successfully completed.

SQL> Received on Sat Dec 04 2010 - 13:12:05 CST

Original text of this message