Re: dynamically created cursor doesn't work for parallel pipelined functions
Date: Sat, 4 Dec 2010 06:48:05 -0800 (PST)
Message-ID: <e99c92a2-ed2b-4e21-ac4f-174f65bb911b_at_j25g2000yqa.googlegroups.com>
a solution to start with - but i have to get around some dependencies:
drop table parallel_test;
drop type MyDoit;
drop type BaseDoit;
CREATE TABLE parallel_test (
id NUMBER(10),
description VARCHAR2(50)
);
BEGIN
FOR i IN 1 .. 50000 LOOP
INSERT INTO parallel_test (id, description)
VALUES (i, 'Description or ' || i);
END LOOP;
COMMIT;
END;
/
create or replace type BaseDoit as object (
id number, member procedure doit( p_sids in out nocopy ton, p_counts in out nocopy ton)
) not final;
/
create or replace type body BaseDoit as
member procedure doit(
p_sids in out nocopy ton,
p_counts in out nocopy ton)
is
begin
dbms_output.put_line('BaseDoit.doit() invoked');
end;
end;
/
- Define a strongly typed REF CURSOR type.
CREATE OR REPLACE PACKAGE parallel_ptf_api AS
TYPE t_parallel_test_row IS RECORD (
id1 NUMBER(10),
desc1 VARCHAR2(50),
id2 NUMBER(10),
desc2 VARCHAR2(50),
sid NUMBER
);
TYPE t_parallel_test_tab IS TABLE OF t_parallel_test_row;
TYPE t_parallel_test_ref_cursor IS REF CURSOR RETURN t_parallel_test_row;
FUNCTION test_ptf (p_cursor IN t_parallel_test_ref_cursor) RETURN t_parallel_test_tab PIPELINED PARALLEL_ENABLE(PARTITION p_cursor BY any);
END parallel_ptf_api;
/
SHOW ERRORS
CREATE OR REPLACE PACKAGE BODY parallel_ptf_api AS
FUNCTION test_ptf (p_cursor IN t_parallel_test_ref_cursor)
RETURN t_parallel_test_tab PIPELINED
PARALLEL_ENABLE(PARTITION p_cursor BY any)
IS
l_row t_parallel_test_row;
BEGIN
LOOP
FETCH p_cursor
INTO l_row;
EXIT WHEN p_cursor%NOTFOUND;
select userenv('SID') into l_row.sid from dual;
PIPE ROW (l_row);
END LOOP;
RETURN;
END test_ptf;
END parallel_ptf_api;
/
SHOW ERRORS
PROMPT
PROMPT Serial Execution
PROMPT ================
SELECT sid, count(*)
FROM TABLE(parallel_ptf_api.test_ptf(CURSOR(SELECT t1.id, t1.description, t2.id, t2.description, null
FROM parallel_test t1,
parallel_test t2
where t1.id = t2.id
)
)
) t2
GROUP BY sid;
PROMPT
PROMPT Parallel Execution
PROMPT ==================
SELECT sid, count(*)
FROM TABLE(parallel_ptf_api.test_ptf(CURSOR(SELECT /*+ parallel(t1,5) */ t1.id, t1.description, t2.id, t2.description, null
FROM parallel_test t1,
parallel_test t2
where t1.id = t2.id
)
)
) t2
GROUP BY sid;
PROMPT
PROMPT Parallel Execution 2
PROMPT ==================
set serveroutput on;
declare
v_sids ton := ton();
v_counts ton := ton();
-- v_cur parallel_ptf_api.t_parallel_test_ref_cursor;
v_cur sys_refcursor;
procedure OpenCursor(p_refCursor out sys_refcursor)
is
begin
open p_refCursor for 'SELECT /*+ parallel(t1,5) */ t1.id,
t1.description, t2.id, t2.description, null
FROM parallel_test t1,
parallel_test t2
where t1.id = t2.id';
end;
begin
OpenCursor(v_cur);
SELECT sid, count(*) bulk collect into v_sids, v_counts FROM TABLE(parallel_ptf_api.test_ptf(v_cur)) t2 GROUP BY sid; for i in v_sids.FIRST.. v_sids.LAST loop dbms_output.put_line (v_sids(i) || ', ' || v_counts(i)); end loop;
end;
/
PROMPT
PROMPT Parallel Execution 3
PROMPT ==================
set serveroutput on;
declare
procedure CreateMyDoit
is
cmd varchar2(4096 char);
begin
cmd := 'create or replace type MyDoit under BaseDoit ( ' ||
' overriding member procedure doit( ' ||
' p_sids in out nocopy ton, ' ||
' p_counts in out nocopy ton) ' ||
' )';
execute immediate cmd;
cmd := 'create or replace type body MyDoit as ' ||
' overriding member procedure doit( ' ||
' p_sids in out nocopy ton, ' ||
' p_counts in out nocopy ton) ' ||
' is ' ||
' begin ' ||
' dbms_output.put_line(''MyDoit.doit() invoked''); ' ||
' SELECT sid, count(*) bulk collect into p_sids, p_counts ' ||
' FROM TABLE(parallel_ptf_api.test_ptf(CURSOR( ' ||
' SELECT /*+ parallel(t1,5) */ t1.id, t1.description,
t2.id, t2.description, null ' ||
' FROM parallel_test t1, parallel_test t2 ' ||
' where t1.id = t2.id ' ||
' ))) ' ||
' GROUP BY sid; ' ||
' end; ' ||
' end; ';
execute immediate cmd;
end;
begin
CreateMyDoit;
end;
/
declare
v_sids ton := ton(); v_counts ton := ton(); instance BaseDoit;
begin
instance := MyDoit(1);
instance.doit(v_sids, v_counts);
for i in v_sids.FIRST.. v_sids.LAST loop dbms_output.put_line (v_sids(i) || ', ' || v_counts(i)); end loop;
end;
/
The output is:
SQL> _at_test
Table dropped.
Type dropped.
Type dropped.
Table created.
PL/SQL procedure successfully completed.
Type created.
Type body created.
Package created.
No errors.
Package body created.
No errors.
Serial Execution
SID COUNT(*)
---------- ----------
649 50000
Parallel Execution
SID COUNT(*)
---------- ----------
457 10124
390 10012
555 9970
389 9924
603 9970
Parallel Execution 2
649, 50000
PL/SQL procedure successfully completed.
Parallel Execution 3
PL/SQL procedure successfully completed.
MyDoit.doit() invoked
468, 9970 483, 9924 389, 10012 368, 10124 341, 9970
PL/SQL procedure successfully completed.
SQL> Received on Sat Dec 04 2010 - 08:48:05 CST
