Re: dynamically created cursor doesn't work for parallel pipelined functions
Date: Sat, 4 Dec 2010 11:12:05 -0800 (PST)
Message-ID: <664bd5d1-efb9-416f-9c50-f0029a414461_at_r29g2000yqj.googlegroups.com>
wow, i am surprised, what's all possible:
drop table parallel_test;
drop type MyDoit;
drop type BaseDoit;
CREATE TABLE parallel_test (
id NUMBER(10),
description VARCHAR2(50)
);
BEGIN
FOR i IN 1 .. 100000 LOOP
INSERT INTO parallel_test (id, description)
VALUES (i, 'Description or ' || i);
END LOOP;
COMMIT;
END;
/
create or replace type BaseDoit as object (
id number, static function make(p_id in number) return BaseDoit, member procedure doit( p_sids in out nocopy ton, p_counts in out nocopy ton)
) not final;
/
create or replace type body BaseDoit as
static function make(p_id in number) return BaseDoit is begin return new BaseDoit(p_id); end; member procedure doit( p_sids in out nocopy ton, p_counts in out nocopy ton) is begin dbms_output.put_line('BaseDoit.doit(' || id || ') invoked...'); end;
end;
/
- Define a strongly typed REF CURSOR type.
CREATE OR REPLACE PACKAGE parallel_ptf_api AS
TYPE t_parallel_test_row IS RECORD (
id1 NUMBER(10),
desc1 VARCHAR2(50),
id2 NUMBER(10),
desc2 VARCHAR2(50),
sid NUMBER
);
TYPE t_parallel_test_tab IS TABLE OF t_parallel_test_row;
TYPE t_parallel_test_ref_cursor IS REF CURSOR RETURN t_parallel_test_row;
FUNCTION test_ptf (p_cursor IN t_parallel_test_ref_cursor) RETURN t_parallel_test_tab PIPELINED PARALLEL_ENABLE(PARTITION p_cursor BY any);
END parallel_ptf_api;
/
SHOW ERRORS
CREATE OR REPLACE PACKAGE BODY parallel_ptf_api AS
FUNCTION test_ptf (p_cursor IN t_parallel_test_ref_cursor)
RETURN t_parallel_test_tab PIPELINED
PARALLEL_ENABLE(PARTITION p_cursor BY any)
IS
l_row t_parallel_test_row;
BEGIN
LOOP
FETCH p_cursor INTO l_row; EXIT WHEN p_cursor%NOTFOUND; select userenv('SID') into l_row.sid from dual; PIPE ROW (l_row);
END LOOP;
RETURN;
END test_ptf;
END parallel_ptf_api;
/
SHOW ERRORS
PROMPT
PROMPT Serial Execution
PROMPT ================
SELECT sid, count(*)
FROM TABLE(parallel_ptf_api.test_ptf(CURSOR(SELECT t1.id, t1.description, t2.id, t2.description, null
FROM parallel_test t1, parallel_test t2 where t1.id = t2.id ) ) ) t2
GROUP BY sid;
PROMPT
PROMPT Parallel Execution
PROMPT ==================
SELECT sid, count(*)
FROM TABLE(parallel_ptf_api.test_ptf(CURSOR(SELECT /*+ parallel(t1,5) */ t1.id, t1.description, t2.id, t2.description, null
FROM parallel_test t1, parallel_test t2 where t1.id = t2.id ) ) ) t2
GROUP BY sid;
PROMPT
PROMPT Parallel Execution 2
PROMPT ==================
set serveroutput on;
declare
v_sids ton := ton(); v_counts ton := ton(); -- v_cur parallel_ptf_api.t_parallel_test_ref_cursor; v_cur sys_refcursor; procedure OpenCursor(p_refCursor out sys_refcursor) is begin open p_refCursor for 'SELECT /*+ parallel(t1,5) */ t1.id, t1.description, t2.id, t2.description, null FROM parallel_test t1, parallel_test t2 where t1.id = t2.id'; end;
begin
OpenCursor(v_cur);
SELECT sid, count(*) bulk collect into v_sids, v_counts FROM TABLE(parallel_ptf_api.test_ptf(v_cur)) t2 GROUP BY sid; for i in v_sids.FIRST.. v_sids.LAST loop dbms_output.put_line (v_sids(i) || ', ' || v_counts(i)); end loop;
end;
/
PROMPT
PROMPT Parallel Execution 3
PROMPT ==================
set serveroutput on;
declare
instance BaseDoit; v_sids ton := ton(); v_counts ton := ton(); procedure CreateMyDoit is cmd varchar2(4096 char); begin cmd := 'create or replace type MyDoit under BaseDoit ( ' || ' static function make(p_id in number) ' || ' return MyDoit, ' || ' overriding member procedure doit( ' || ' p_sids in out nocopy ton, ' || ' p_counts in out nocopy ton) ' || ' )'; execute immediate cmd; cmd := 'create or replace type body MyDoit as ' || ' static function make(p_id in number) ' || ' return MyDoit ' || ' is ' || ' begin ' || ' return new MyDoit(p_id); ' || ' end; ' || ' ' || ' overriding member procedure doit( ' || ' p_sids in out nocopy ton, ' || ' p_counts in out nocopy ton) ' || ' is ' || ' begin ' || ' dbms_output.put_line(''MyDoit.doit('' || id || '')invoked...''); ' ||
' SELECT sid, count(*) bulk collect into p_sids, p_counts ' || ' FROM TABLE(parallel_ptf_api.test_ptf(CURSOR( ' || ' SELECT /*+ parallel(t1,5) */ t1.id, t1.description, t2.id, t2.description, null ' || ' FROM parallel_test t1, parallel_test t2 ' || ' where t1.id = t2.id ' || ' ))) ' || ' GROUP BY sid; ' || ' end; ' || ' end; '; execute immediate cmd; end;
begin
CreateMyDoit; execute immediate 'select MyDoit.Make(11) from dual' into instance; instance.doit(v_sids, v_counts); if v_sids.COUNT > 0 then for i in v_sids.FIRST.. v_sids.LAST loop dbms_output.put_line (v_sids(i) || ', ' || v_counts(i)); end loop; end if;
end;
/
The output is:
SQL> _at_test
Table dropped.
Type dropped.
Type dropped.
Table created.
PL/SQL procedure successfully completed.
Type created.
Type body created.
Package created.
No errors.
Package body created.
No errors.
Serial Execution
SID COUNT(*)
---------- ----------
400 100000
Parallel Execution
SID COUNT(*)
---------- ----------
429 20130 458 20035 547 20040 594 19886 390 19909
Parallel Execution 2
400, 100000
PL/SQL procedure successfully completed.
Parallel Execution 3
MyDoit.doit(11) invoked...
429, 19886 390, 20130 368, 20035 519, 19909 458, 20040
PL/SQL procedure successfully completed.
SQL> Received on Sat Dec 04 2010 - 13:12:05 CST