Home » SQL & PL/SQL » SQL & PL/SQL » DBMS_SCHEDULER.JOB => How to commit only once (Oracle 11.2.0.2.0 )
DBMS_SCHEDULER.JOB => How to commit only once [message #516141] |
Fri, 15 July 2011 03:57  |
 |
ric90
Messages: 42 Registered: May 2011 Location: Belfort
|
Member |
|
|
Hello,
I'm using dbms_scheduler for a while to parallelize some big treatments.
Now, i create a new procedure to parallelize inserts in 40 tables.
I create a sort of thread pool, and when the pool is finished, if there is no exception, i make the commit.
But, when there is exceptions in my pool, i woul like to make a rollback....
But alls jobs are running in a new sessions, which permit me to parallelize the treatment...
Is there a way to control commit and rollback using dbms scheduler (maybe using differents job classes.....)
I post my code here :
DECLARE
p_avantderniermatcle NUMBER := 0;
cpt_rows NUMBER := 0;
v_sqlStt VARCHAR2(32767);
rec_matcle SYS_REFCURSOR;
TYPE t_matcle IS TABLE OF VARCHAR2(10);
tab_matcle t_matcle;
cpt_running_thread NUMBER;
v_logId NUMBER;
v_errorDetail VARCHAR2(4000);
v_status VARCHAR2(30);
cpt_error NUMBER;
dump_log VARCHAR2(32767);
num_thread NUMBER := 0;
SUCCESS_THREAD CONSTANT VARCHAR2(30) := 'SUCCEEDED';
TYPE t_error IS TABLE OF VARCHAR2(4000) INDEX BY PLS_INTEGER;
table_error t_error;
TYPE table_varchar IS TABLE OF VARCHAR2(50) INDEX BY PLS_INTEGER;
thread_pool table_varchar;
-- logger
NOM_TRT VARCHAR2(50) := 'LOAD_ZY_MATCLE_'|| p_avantDernierMatcle;
pCTX PLOG.LOG_CTX ;
BEGIN
pCTX:= PLOG.init(NOM_PACKAGE||'.'||NOM_TRT,LOG_LEVEL);
OPEN rec_matcle FOR ('select distinct MATRICULE_RH FROM RRGQTZY_W'||p_avantderniermatcle);
FETCH rec_matcle BULK COLLECT INTO tab_matcle;
IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, 'NB MATCLE A TRAITER <'||tab_matcle.COUNT||'>'); END IF;
IF tab_matcle.COUNT > 0 THEN
FOR j IN tab_matcle.FIRST..tab_matcle.LAST LOOP
IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, 'DEBUT TRAITEMENT <'||tab_matcle(j)||'>'); END IF;
BEGIN
FOR i IN tab_codeInformation.FIRST..tab_codeInformation.LAST LOOP
thread_pool(i) := dbms_scheduler.generate_job_name('P_'||tab_matcle(j)||'_'||tab_codeInformation(i)||'_');
DBMS_SCHEDULER.CREATE_JOB (
job_name => thread_pool(i),
job_type => 'PLSQL_BLOCK',
job_action => 'BEGIN
RRG.RRGQAZY.TRAITE_INFORMATION('''||tab_matcle(j)||''',TO_TIMESTAMP('''||TO_CHAR(p_dateChargement, CONSTANTS.FMT_DATE)||''', RRG.CONSTANTS.FMT_DATE),'||p_avantDernierMatcle||','''||tab_codeInformation(i)||''');
END;',
auto_drop => TRUE,
enabled => FALSE,
comments => 'P_'||tab_matcle(j)||'_'||tab_codeInformation(i));
DBMS_SCHEDULER.set_attribute(
name => thread_pool(i),
attribute => 'raise_events',
VALUE => DBMS_SCHEDULER.job_failed);
DBMS_SCHEDULER.ENABLE(thread_pool(i));
END LOOP;
SELECT COUNT(1) INTO cpt_running_thread
FROM USER_SCHEDULER_JOBS
WHERE JOB_NAME LIKE '%'||tab_matcle(j)||'%';
WHILE cpt_running_thread > 0 LOOP
SELECT COUNT(1) INTO cpt_running_thread
FROM USER_SCHEDULER_JOBS
WHERE JOB_NAME LIKE '%'||tab_matcle(j)||'%';
END LOOP;
num_thread := thread_pool.FIRST;
WHILE num_thread IS NOT NULL LOOP
SELECT count(1)--LOG_ID, ADDITIONAL_INFO , STATUS, LOG_ID||CHR(9)||LOG_DATE||CHR(9)||OWNER||CHR(9)||JOB_NAME||CHR(9)||JOB_SUBNAME||CHR(9)||STATUS||CHR(9)||ERROR#||CHR(9)||REQ_START_DATE||CHR(9)||ACTUAL_START_DATE||CHR(9)||RUN_DURATION||CHR(9)||INSTANCE_ID||CHR(9)||SESSION_ID||CHR(9)||SLAVE_PID||CHR(9)||CPU_USED||CHR(9)||ADDITIONAL_INFO
INTO v_logId --, v_errorDetail , v_status, dump_log
FROM USER_SCHEDULER_JOB_RUN_DETAILS
WHERE JOB_NAME = thread_pool(num_thread)
AND STATUS <> SUCCESS_THREAD ;
IF v_logId > 0 THEN
RAISE_APPLICATION_ERROR(-20100, 'SOME THREADS FAILED');
END IF;
num_thread := thread_pool.NEXT(num_thread);
END LOOP;
thread_pool.DELETE;
-- Le dossier est complet, on delete la table tempo et on commit
EXECUTE IMMEDIATE 'DELETE RRGQTZY_W'||p_avantderniermatcle||' A WHERE A.DATE_TRAITEMENT = :p_dateTrtt AND A.MATRICULE_RH = :matcle'
USING p_dateChargement,tab_matcle(j);
COMMIT;
IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, 'FIN TRAITEMENT <'||tab_matcle(j)||'>'); END IF;
EXCEPTION
WHEN OTHERS THEN
-- ERREUR SUR LE DOSSIER
thread_pool.DELETE;
table_error.DELETE;
ROLLBACK;
num_thread := thread_pool.FIRST;
WHILE num_thread IS NOT NULL LOOP
BEGIN
dbms_scheduler.drop_job(thread_pool(num_thread), TRUE);
EXCEPTION
WHEN OTHERS THEN
BEGIN
dbms_scheduler.STOP_job(thread_pool(num_thread), TRUE);
dbms_scheduler.drop_job(thread_pool(num_thread), TRUE);
EXCEPTION
WHEN OTHERS THEN
NULL;
END;
END;
num_thread := thread_pool.NEXT(num_thread);
END LOOP;
IF plog.iserrorenabled(pCTX) THEN plog.ERROR(pCTX, 'TRAITEMENT DU DOSSIER INCOMPLET <'||tab_matcle(j)||'>'); END IF;
plog.ERROR();
END;
END LOOP;
END IF;
END;
|
|
|
|
Re: DBMS_SCHEDULER.JOB => How to commit only once [message #516166 is a reply to message #516141] |
Fri, 15 July 2011 05:23   |
John Watson
Messages: 8988 Registered: January 2010 Location: Global Village
|
Senior Member |
|
|
Ric, this isn't directly relevant to your question, but I would be interested to know if you considered using lightweight jobs for this. I have never found a situation where I was certain lightweight jobs would be appropriate, but they might be a good way of implementing your technique of creating/enabling/dropping many jobs.
|
|
|
|
Re: DBMS_SCHEDULER.JOB => How to commit only once [message #516816 is a reply to message #516707] |
Thu, 21 July 2011 01:30   |
John Watson
Messages: 8988 Registered: January 2010 Location: Global Village
|
Senior Member |
|
|
Hi, man - is it possible for you to post your solution? As I said, I've been looking for an example where lightweight jobs really worked, and this sounds like a good one.
Can you put any figures to how long the process took before and after the change? How many jobs are there?
|
|
|
Re: DBMS_SCHEDULER.JOB => How to commit only once [message #517052 is a reply to message #516816] |
Fri, 22 July 2011 02:48   |
 |
ric90
Messages: 42 Registered: May 2011 Location: Belfort
|
Member |
|
|
Hi there
I post my solution, not so generic, but can be use usefull. Sorry for the length of this post.
I hava a lot of datas to load in database, using generic tables (all the same columns / datatypes) => 34 tables to load, using 10 temps tabnles (**_Wx)
I launch 10 times this treatment (LOAD). And on each treatement, i lanch 34 LIGHTWEIGHT Jobs.
So i have 340 process in //
I load on ID (34 tables whith the functionnal process) between 0.4s and 1.5s (in fact, 10 ID, because i have 10 sqlplus running in parallel) versus 4.5s with REGULAR Jobs.
I don't have a lot of rows for each ID (between 1 and 150). But the problem was to load every table for each ID without wasting time.
For your information, without job processes, it takes 7 sec per ID.
I have to treat 150 000 ID.
here is my code
create or replace
PACKAGE RIBICSZY AS
/***
* @Comments : Traitement des données (ZY)
********
* @Modification : 10/05/2011
* @Author : Richard BIDET - RiBiCS - Computing Services
* @Version : v0.01
* @Object : Created this package
********
* @Modification : 21/07/2011
* @Author : Richard BIDET - RiBiCS - Computing Services
* @Version : v0.03
* @Object : Lightweight Job
********
*******/
VERSION CHAR(7) := '0.03';
step NUMBER := -1;
NOM_PACKAGE VARCHAR2(50) := 'RIBICSZY';
LOG_LEVEL NUMBER := PLOGPARAM.DEFAULT_LEVEL;
TYPE t_newInformation IS TABLE OF RIBICSZY_W0%ROWTYPE;
TYPE t_codeInformation IS TABLE OF VARCHAR2(4);
/**
A modifier pour une insertion dans les ZY par cuseurs (= TRUE)
Moins perf, mais plus secure
**/
executeCursor CONSTANT BOOLEAN := TRUE;
executeThread CONSTANT BOOLEAN := TRUE;
/**
select substr(table_name,6) FROM user_tables WHERE instr(table_name, 'RIBICSZY') > 0 and instr(table_name, '_') <= 0
order by 1
**/
tab_codeInformation t_codeInformation := t_codeInformation(
'ZYAF',
'ZYAG',
'ZYCA',
'ZYCO',
'ZYDI',
'ZYEC',
'ZYES',
'ZYSB',
'ZYTL',
'ZYU5',
'ZYVE',
'ZYWA',
'ZYWB',
'ZYWC',
'ZYW0',
'ZY00',
'ZY01',
'ZY05',
'ZY06',
'ZY07',
'ZY1S',
'ZY1V',
'ZY10',
'ZY11',
'ZY12',
'ZY19',
'ZY3B',
'ZY3C',
'ZY3G',
'ZY3Y',
'ZY38',
'ZY4I',
'ZY4K'
);
PROCEDURE LOAD(p_avantDernierId IN NUMBER, p_dateChargement DATE);
PROCEDURE LOAD_ZY_Id(p_dateChargement IN DATE, p_avantDernierId IN NUMBER);
PROCEDURE TRAITE_REJET(p_dateChargement IN TIMESTAMP, p_avantDerrnierId IN NUMBER);
PROCEDURE TRAITE_INFORMATION(p_Id IN VARCHAR2, p_dateChargement IN TIMESTAMP, p_avantDerrnierId IN NUMBER, p_codeInformation IN VARCHAR2);
END;
/
create or replace
PACKAGE BODY RIBICSZY AS
/****p* DROP_PROGRAM
* NAME
* DROP_PROGRAM
* SYNOPSIS
* DROP_PROGRAM(p_avant_dernier_Id IN NUMBER)
* FUNCTION
* Suppression des programmes dans le DBMS_SCHEDULER
* INPUTS
* v_avantDernierId IN NUMBER : l'avant dernier numéro du Id pour pouvoir faire tourner les 10 processes en parallèle.
* RESULT
* None
* NOTES
* @Creation : 11/07/2011
* @Author : Richard BIDET - RiBiCS - Computing Services
* @Version : v0.01
*******
* @Modification :
* @Author :
* @Version :
* @Object :
* BUGS
*
* SEE ALSO
* http://www.oracle-base.com/articles/11g/SchedulerEnhancements_11gR1.php
* http://download.oracle.com/docs/cd/B28359_01/server.111/b28310/scheduse002.htm
******
*/
PROCEDURE DROP_PROGRAM(p_avant_dernier_Id IN NUMBER) IS
programName VARCHAR2(50);
-- logger
NOM_TRT VARCHAR2(50) := 'DROP_PROGRAM_<'||p_avant_dernier_Id||'>';
pCTX PLOG.LOG_CTX ;
BEGIN
pCTX:= PLOG.init(NOM_PACKAGE||'.'||NOM_TRT,LOG_LEVEL);
FOR i IN tab_codeInformation.FIRST..tab_codeInformation.LAST LOOP
BEGIN
programName := 'TRAITE_'||p_avant_dernier_Id||'_'||tab_codeInformation(i);
DBMS_SCHEDULER.DROP_PROGRAM (programName, TRUE);
EXCEPTION
WHEN OTHERS THEN
-- PROGRAM DOES NOT EXIST
IF plog.iswarnenabled THEN plog.warn(pCTX, programName || ' does not exist'); END IF;
END;
END LOOP;
END DROP_PROGRAM;
/****p* CREATE_PROGRAM
* NAME
* CREATE_PROGRAM
* SYNOPSIS
* CREATE_PROGRAM(p_avant_dernier_Id IN NUMBER)
* FUNCTION
* Création des programmes dans le DBMS_SCHEDULER
* INPUTS
* v_avantDernierId IN NUMBER : l'avant dernier numéro du Id pour pouvoir faire tourner les 10 processes en parallèle.
* RESULT
* None
* NOTES
* @Creation : 11/07/2011
* @Author : Richard BIDET - RiBiCS - Computing Services
* @Version : v0.01
*******
* @Modification :
* @Author :
* @Version :
* @Object :
* BUGS
*
* SEE ALSO
* http://www.oracle-base.com/articles/11g/SchedulerEnhancements_11gR1.php
* http://download.oracle.com/docs/cd/B28359_01/server.111/b28310/scheduse002.htm
******
*/
PROCEDURE CREATE_PROGRAM(p_avant_dernier_Id IN NUMBER) IS
programName VARCHAR2(50);
BEGIN
FOR i IN tab_codeInformation.FIRST..tab_codeInformation.LAST LOOP
BEGIN
programName := 'TRAITE_'||p_avant_dernier_Id||'_'||tab_codeInformation(i);
DBMS_SCHEDULER.CREATE_PROGRAM (
program_name => programName,
program_type => 'STORED_PROCEDURE',
program_action => 'RRG.RIBICSZY.TRAITE_INFORMATION',
number_of_arguments => 4,
enabled => FALSE,
comments => 'TRAITE_'||p_avant_dernier_Id||'_'||tab_codeInformation(i));
DBMS_SCHEDULER.define_program_argument(
program_name => programName,
argument_position => 1,
argument_name => 'p_Id',
argument_type => 'VARCHAR2',
out_argument => FALSE);
DBMS_SCHEDULER.define_program_argument(
program_name => programName,
argument_position => 2,
argument_name => 'p_dateChargement',
argument_type => 'TIMESTAMP',
out_argument => FALSE);
DBMS_SCHEDULER.define_program_argument(
program_name => programName,
argument_position => 3,
argument_name => 'p_avantDerrnierId',
argument_type => 'NUMBER',
out_argument => FALSE);
DBMS_SCHEDULER.define_program_argument(
program_name => programName,
argument_position => 4,
argument_name => 'p_codeInformation',
argument_type => 'VARCHAR2',
out_argument => FALSE);
DBMS_SCHEDULER.ENABLE (
NAME => programName);
END;
END LOOP;
EXCEPTION
WHEN EXCEPTIONS.OBJECT_ALREADY_EXIST THEN
DROP_PROGRAM(p_avant_dernier_Id);
CREATE_PROGRAM(p_avant_dernier_Id);
END CREATE_PROGRAM;
/****p* LOAD
* NAME
* LOAD
* SYNOPSIS
* LOAD(v_avantDernierId IN NUMBER)
* FUNCTION
* Alimentation des données salariées
* INPUTS
* v_avantDernierId IN NUMBER : l'avant dernier numéro du Id pour pouvoir faire tourner les 10 processes en parallèle.
* RESULT
* None
* NOTES
* @Creation : 11/07/2011
* @Author : Richard BIDET - RiBiCS - Computing Services
* @Version : v0.01
*******
* @Modification :
* @Author :
* @Version :
* @Object :
* BUGS
*
* SEE ALSO
*
******
*/
PROCEDURE LOAD(p_avantDernierId IN NUMBER, p_dateChargement DATE) AS
-- logger
NOM_TRT VARCHAR2(50) := 'LOAD<'||p_avantDernierId||'>';
pCTX PLOG.LOG_CTX ;
BEGIN
pCTX:= PLOG.init(NOM_PACKAGE||'.'||NOM_TRT,LOG_LEVEL);
IF plog.isinfoenabled THEN plog.info(pCTX, 'Debut du traitement<'||TO_CHAR(p_dateChargement,CONSTANTS.FMT_DATE)||'>'); END IF;
IF (p_avantDernierId BETWEEN 0 AND 9) AND p_dateChargement IS NOT NULL THEN
CREATE_PROGRAM(p_avantDernierId);
LOAD_ZY_Id(p_dateChargement, p_avantDernierId);
DROP_PROGRAM(p_avantDernierId);
ELSE
RAISE_APPLICATION_ERROR(-20100, 'ERR => WRONG NUMBER OR WRONG DATE');
END IF;
IF plog.isinfoenabled THEN plog.info(pCTX, 'Fin du traitement'); END IF;
END LOAD;
/****p* LOAD_ZY_Id_0
* NAME
* LOAD_ZY_Id_0 --
* SYNOPSIS
* LOAD_ZY_Id_0
* FUNCTION
* Alimentation des données dont l'avant dernier numéro du Id est 0
* INPUTS
* p_dateChargement DATE
* RESULT
* None
* NOTES
* @Creation : 10/05/2011
* @Author : Richard BIDET - RiBiCS - Computing Services
* @Version : v0.01
*******
* @Modification :
* @Author :
* @Version :
* @Object :
* BUGS
*
* SEE ALSO
*
******
*/
PROCEDURE LOAD_ZY_Id(p_dateChargement IN DATE, p_avantDernierId IN NUMBER) AS
v_newInformation T_ZY_W;
cpt_rows NUMBER := 0;
v_sqlStt VARCHAR2(32767);
rec_Id SYS_REFCURSOR;
TYPE t_Id IS TABLE OF VARCHAR2(10);
tab_Id t_Id;
cpt_running_thread NUMBER;
v_logId NUMBER;
v_errorDetail VARCHAR2(4000);
v_status VARCHAR2(30);
cpt_error NUMBER;
dump_log VARCHAR2(32767);
num_thread NUMBER := 0;
SUCCESS_THREAD CONSTANT VARCHAR2(30) := 'SUCCEEDED';
TYPE t_error IS TABLE OF VARCHAR2(4000) INDEX BY PLS_INTEGER;
table_error t_error;
l_start NUMBER;
l_end NUMBER;
TYPE table_varchar IS TABLE OF VARCHAR2(50) INDEX BY PLS_INTEGER;
thread_pool table_varchar;
-- logger
NOM_TRT VARCHAR2(50) := 'LOAD_ZY_Id_'|| p_avantDernierId;
pCTX PLOG.LOG_CTX ;
BEGIN
pCTX:= PLOG.init(NOM_PACKAGE||'.'||NOM_TRT,LOG_LEVEL);
IF executeCursor = TRUE THEN
IF plog.isinfoenabled THEN plog.info(pCTX, 'Debut du traitement with executeCursor <'||TO_CHAR(p_dateChargement,CONSTANTS.FMT_DATE)||'>'); END IF;
ELSE
IF plog.isinfoenabled THEN plog.info(pCTX, 'Debut du traitement without executeCursor <'||TO_CHAR(p_dateChargement,CONSTANTS.FMT_DATE)||'>'); END IF;
END IF;
OPEN rec_Id FOR ('select distinct ID_NUM FROM RIBICSZY_W'||p_avantdernierId);
FETCH rec_Id BULK COLLECT INTO tab_Id;
IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, 'NB Id A TRAITER <'||tab_Id.COUNT||'>'); END IF;
IF tab_Id.COUNT > 0 THEN
FOR j IN tab_Id.FIRST..tab_Id.LAST LOOP
IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, 'DEBUT TRAITEMENT <'||tab_Id(j)||'>'); END IF;
l_start := DBMS_UTILITY.get_time;
BEGIN
FOR i IN tab_codeInformation.FIRST..tab_codeInformation.LAST LOOP
thread_pool(i) := dbms_scheduler.generate_job_name('P_'||tab_Id(j)||'_'||tab_codeInformation(i)||'_');
DBMS_SCHEDULER.CREATE_JOB (
job_name => thread_pool(i),
program_name => 'TRAITE_'||p_avantdernierId||'_'||tab_codeInformation(i),
job_style => 'LIGHTWEIGHT',
auto_drop => TRUE,
enabled => FALSE,
comments => thread_pool(i));
DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE (
job_name => thread_pool(i),
argument_position => 1,
argument_value => tab_Id(j));
DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE (
job_name => thread_pool(i),
argument_position => 2,
argument_value => to_timestamp(p_dateChargement));
DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE (
job_name => thread_pool(i),
argument_position => 3,
argument_value => p_avantDernierId);
DBMS_SCHEDULER.SET_JOB_ARGUMENT_VALUE (
job_name => thread_pool(i),
argument_position => 4,
argument_value => tab_codeInformation(i));
DBMS_SCHEDULER.set_attribute(
name => thread_pool(i),
attribute => 'raise_events',
VALUE => DBMS_SCHEDULER.job_failed);
DBMS_SCHEDULER.ENABLE(thread_pool(i));
END LOOP;
IF executeThread THEN
SELECT COUNT(1) INTO cpt_running_thread
FROM USER_SCHEDULER_JOBS
WHERE JOB_NAME LIKE '%'||tab_Id(j)||'%';
WHILE cpt_running_thread > 0 LOOP
SELECT COUNT(1) INTO cpt_running_thread
FROM USER_SCHEDULER_JOBS
WHERE JOB_NAME LIKE '%'||tab_Id(j)||'%'
AND STATE <> 'DISABLED';
END LOOP;
num_thread := thread_pool.FIRST;
WHILE num_thread IS NOT NULL LOOP
--tab_codeInformation.FIRST..tab_codeInformation.LAST
SELECT count(1)--LOG_ID, ADDITIONAL_INFO , STATUS, LOG_ID||CHR(9)||LOG_DATE||CHR(9)||OWNER||CHR(9)||JOB_NAME||CHR(9)||JOB_SUBNAME||CHR(9)||STATUS||CHR(9)||ERROR#||CHR(9)||REQ_START_DATE||CHR(9)||ACTUAL_START_DATE||CHR(9)||RUN_DURATION||CHR(9)||INSTANCE_ID||CHR(9)||SESSION_ID||CHR(9)||SLAVE_PID||CHR(9)||CPU_USED||CHR(9)||ADDITIONAL_INFO
INTO v_logId --, v_errorDetail , v_status, dump_log
FROM USER_SCHEDULER_JOB_RUN_DETAILS
WHERE JOB_NAME = thread_pool(num_thread)
AND STATUS <> SUCCESS_THREAD ;
IF v_logId > 0 THEN
RAISE_APPLICATION_ERROR(-20100, 'SOME THREADS FAILED');
END IF;
num_thread := thread_pool.NEXT(num_thread);
END LOOP;
IF table_error.COUNT > 0 THEN
table_error.DELETE;
RAISE_APPLICATION_ERROR(-20100, 'SOME THREADS FAILED');
END IF;
END IF;
thread_pool.DELETE;
-- Le dossier est complet, on delete la table tempo et on commit
EXECUTE IMMEDIATE 'DELETE RIBICSZY_W'||p_avantdernierId||' A WHERE A.DATE_TRAITEMENT = :p_dateTrtt AND A.ID_NUM = :Id'
USING p_dateChargement,tab_Id(j);
COMMIT;
IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, 'FIN TRAITEMENT <'||tab_Id(j)||'>'||chr(9)||'DUREE (hsec) <'||(DBMS_UTILITY.get_time - l_start)||'>'); END IF;
EXCEPTION
WHEN OTHERS THEN
-- ERREUR SUR LE DOSSIER
thread_pool.DELETE;
table_error.DELETE;
ROLLBACK;
IF executeThread THEN
num_thread := thread_pool.FIRST;
WHILE num_thread IS NOT NULL LOOP
BEGIN
dbms_scheduler.drop_job(thread_pool(num_thread), TRUE);
EXCEPTION
WHEN OTHERS THEN
BEGIN
dbms_scheduler.STOP_job(thread_pool(num_thread), TRUE);
dbms_scheduler.drop_job(thread_pool(num_thread), TRUE);
EXCEPTION
WHEN OTHERS THEN
NULL;
END;
END;
num_thread := thread_pool.NEXT(num_thread);
END LOOP;
-- ROLLBACK pour les thread, car tous ont déjà commité leurs transactions
FOR i IN tab_codeInformation.FIRST..tab_codeInformation.LAST LOOP
BEGIN
EXECUTE IMMEDIATE
'DELETE FROM RIBICS'||tab_codeInformation(i)||' WHERE ID_NUM = :1'||CHR(10)
||' AND AVANT_DERNIER_Id = :2'||CHR(10)
||' AND DINFOIDENTITE = :3' USING tab_Id(j), p_avantdernierId, p_dateChargement;
COMMIT;
END;
END LOOP;
END IF;
IF plog.iserrorenabled(pCTX) THEN plog.ERROR(pCTX, 'TRAITEMENT DU DOSSIER INCOMPLET <'||tab_Id(j)||'>'||chr(9)||'DUREE (hsec) <'||(DBMS_UTILITY.get_time - l_start)||'>'); END IF;
plog.ERROR();
END;
END LOOP;
END IF;
END LOAD_ZY_Id;
/****p* TRAITE_INFORMATION
* NAME
* TRAITE_INFORMATION
* SYNOPSIS
* TRAITE_INFORMATION(p_newInformation IN t_zy_w, p_dateChargement IN TIMESTAMP, p_avantDerrnierId IN NUMBER, p_codeInformation IN VARCHAR2)
* FUNCTION
* Traitement de l'information ZY => ACTIF/INACTIF
* INPUTS
* p_newInformation IN t_zy_w : contient la nouvelle information chargee
* p_dateChargement IN TIMESTAMP
* p_avantDerrnierId IN NUMBER
* p_codeInformation IN VARCHAR2 : contient le code de l'information à traiter
* RESULT
* None
* NOTES
* @Creation : 25/05/2011
* @Author : Richard BIDET - RiBiCS - Computing Services
* @Version : v0.01
*******
* @Modification :
* @Author :
* @Version :
* @Object :
*******
* BUGS
*
* SEE ALSO
*
******
*/
PROCEDURE TRAITE_INFORMATION(p_Id IN VARCHAR2, p_dateChargement IN TIMESTAMP, p_avantDerrnierId IN NUMBER, p_codeInformation IN VARCHAR2) IS
v_sqlstt VARCHAR2(32767);
v_sqlSttLog VARCHAR2(32767) ;
p_newInformation t_zy_w;
cpt_rows NUMBER;
NOM_TRT VARCHAR2(50) := 'P_TRAITE_INFORMATION_'||p_codeInformation||' <'||p_avantDerrnierId||'>';
pCTX PLOG.LOG_CTX ;
BEGIN
pCTX:= PLOG.init(NOM_PACKAGE||'.'||NOM_TRT,LOG_LEVEL);
IF p_dateChargement IS NULL THEN
RAISE_APPLICATION_ERROR(-20100, 'p_dateChargement ne peut etre null !!!');
END IF;
IF plog.isdebugenabled(pCTX) THEN plog.DEBUG(pCTX, 'Debut du traitement <'||p_codeInformation||'> - <'||TO_CHAR(p_dateChargement, CONSTANTS.FMT_DATE)||'> - <'||p_avantDerrnierId||'>'); END IF;
-- Selection des informations nouvelles
v_sqlStt := '
SELECT CAST
(MULTISET (
SELECT
CODE_INFORMATION,
ID_NUM,
INFORMATION_DATA,
DATE_TRAITEMENT,
LINE_NUMBER
FROM RIBICSZY_W'||p_avantDerrnierId||' A
WHERE A.code_information = '''||p_codeInformation ||'''
AND A.DATE_TRAITEMENT = :p_dateTrtt
AND A.ID_NUM = :Id
) AS T_ZY_W) FROM DUAL';
IF plog.isdebugenabled THEN plog.DEBUG(pCTX, v_sqlStt); END IF;
EXECUTE IMMEDIATE v_sqlStt INTO p_newInformation USING p_dateChargement,p_Id;
cpt_rows := p_newInformation.count;
IF plog.isdebugenabled(pCTX) THEN plog.DEBUG(pCTX, 'NB_LIGNES A TRAITER <'||p_codeInformation||'> : '||cpt_rows); END IF;
v_sqlstt :=
' DECLARE '||CHR(10)
||' v_exist T_ZY;'||CHR(10)
||' cpt_rows NUMBER := 0;'||CHR(10)
||' pCTX PLOG.LOG_CTX;'||CHR(10)
||' p_newInformation t_zy_w := :p_newInformation;'||CHR(10)
||' BEGIN '||CHR(10)
||' pCTX:= PLOG.init(''TRAITE_INFORMATION<'||p_codeInformation||'><'||p_Id||'>'','||LOG_LEVEL||');'||CHR(10)
||' BEGIN'||CHR(10)
||' SELECT CAST (MULTISET('||CHR(10)
||' SELECT DISTINCT ID_NUM, DINFOIDENTITE, DINFOIDENTITE_INFORMATION, ACTIF, INFORMATION_DATA'||CHR(10)
||' FROM (SELECT existt.ID_NUM, existt.DINFOIDENTITE, existt.DINFOIDENTITE_INFORMATION, existt.ACTIF, existt.INFORMATION_DATA'||CHR(10)
||' ,rank() OVER(PARTITION BY existt.ID_NUM ORDER BY existt.DINFOIDENTITE DESC) rnk'||CHR(10)
||' FROM RIBICS'||p_codeInformation||' existt'||CHR(10)
||' WHERE AVANT_DERNIER_Id = '||p_avantDerrnierId||CHR(10)
||' AND existt.ID_NUM = '''||p_Id||''''||CHR(10)
||' ) WHERE rnk=1) AS T_ZY) INTO v_exist FROM DUAL;'||CHR(10)
||' EXCEPTION'||CHR(10)
||' WHEN NO_DATA_FOUND THEN '||CHR(10)
||' IF plog.iswarnenabled(pCTX) THEN plog.warn(pCTX, ''Pas de données existantes. Toutes les données reçues sont nouvelles''); END IF;'||CHR(10)
||' END;'||CHR(10)
||' IF v_exist.COUNT > 0 THEN'||CHR(10)
||' IF plog.isdebugenabled(pCTX) THEN'||CHR(10)
||' FOR m IN v_exist.FIRST..v_exist.LAST LOOP'||CHR(10)
||' plog.debug(pCTX, v_exist(m).ID_NUM||chr(9)||v_exist(m).DINFOIDENTITE||chr(9)||v_exist(m).DINFOIDENTITE_INFORMATION||chr(9)||v_exist(m).ACTIF||chr(9)||v_exist(m).INFORMATION_DATA);'||CHR(10)
||' END LOOP;'||CHR(10)
||' END IF;'||CHR(10)
||' ELSE'||CHR(10)
||' IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX,''Pas de données existantes. Toutes les données reçues sont nouvelles''); END IF;'||CHR(10)
||' END IF;'||CHR(10)
||' INSERT INTO RIBICS'||p_codeInformation||'(ID_NUM, DINFOIDENTITE, DINFOIDENTITE_INFORMATION, ACTIF, INFORMATION_DATA)'||CHR(10)
||' (SELECT ID_NUM, :p_dateChargement, DINFOIDENTITE_INFORMATION, ACTIF, INFORMATION_DATA FROM TABLE(v_exist) WHERE ACTIF = CONSTANTS.INACTIF);'||CHR(10)
||' cpt_rows := SQL%ROWCOUNT;'||CHR(10)
||' IF plog.isdebugenabled(pCTX) THEN plog.debug(pCTX, ''NB_LIGNES INACTIVE INSERTED : ''||cpt_rows); END IF;'||CHR(10)
||' IF plog.isdebugenabled(pCTX) THEN plog.debug(pCTX, ''DEBUT INTERSECT''); END IF;'||CHR(10)
||' cpt_rows := 0;'||CHR(10)
||' FOR rec_inter IN ('||CHR(10)
||' SELECT ID_NUM, INFORMATION_DATA FROM TABLE(v_exist) WHERE ACTIF = CONSTANTS.ACTIF'||CHR(10)
||' INTERSECT'||CHR(10)
||' SELECT ID_NUM, INFORMATION_DATA FROM TABLE(p_newInformation))'||CHR(10)
||' LOOP'||CHR(10)
||' BEGIN'||CHR(10)
||' INSERT INTO RIBICS'||p_codeInformation||'(ID_NUM, DINFOIDENTITE, DINFOIDENTITE_INFORMATION, ACTIF, INFORMATION_DATA)'||CHR(10)
||' (SELECT ID_NUM, :p_dateChargement, DINFOIDENTITE_INFORMATION, ACTIF, INFORMATION_DATA FROM TABLE(v_exist) '||CHR(10)
||' WHERE ACTIF = CONSTANTS.ACTIF AND ID_NUM = rec_inter.ID_NUM AND INFORMATION_DATA=rec_inter.INFORMATION_DATA);'||CHR(10)
||' cpt_rows := cpt_rows + SQL%ROWCOUNT;'||CHR(10)
||' EXCEPTION'||CHR(10)
||' WHEN DUP_VAL_ON_INDEX THEN '||CHR(10)
||' IF plog.iswarnenabled(pCTX) THEN plog.warn(pCTX, ''DUPVAL_ON_INDEX => ''||rec_inter.ID_NUM); END IF;'||CHR(10)
||' RAISE;'||CHR(10)
||' WHEN OTHERS THEN '||CHR(10)
||' IF plog.iserrorenabled(pCTX) THEN plog.error(pCTX, TO_CHAR(:p_dateChargement, CONSTANTS.FMT_DATE)||chr(9)||rec_inter.ID_NUM||chr(9)||rec_inter.INFORMATION_DATA); END IF;'||CHR(10)
||' RAISE;'||CHR(10)
||' END;'||CHR(10)
||' END LOOP;'||CHR(10)
||' IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, ''NB_LIGNES INSERTED for INTERSECT: ''||cpt_rows); END IF;'||CHR(10)
||' IF plog.isdebugenabled(pCTX) THEN plog.debug(pCTX, ''DEBUT EXIST MINUS NEW''); END IF;'||CHR(10)
||' cpt_rows := 0;'||CHR(10)
||' FOR rec_ex_min_new IN ('||CHR(10)
||' SELECT ID_NUM, INFORMATION_DATA FROM TABLE(v_exist) WHERE ACTIF = CONSTANTS.ACTIF'||CHR(10)
||' MINUS'||CHR(10)
||' SELECT ID_NUM, INFORMATION_DATA FROM TABLE(p_newInformation))'||CHR(10)
||' LOOP'||CHR(10)
||' BEGIN'||CHR(10)
||' INSERT INTO RIBICS'||p_codeInformation||'(ID_NUM, DINFOIDENTITE, DINFOIDENTITE_INFORMATION, ACTIF, INFORMATION_DATA)'||CHR(10)
||' (SELECT ID_NUM, :p_dateChargement, :p_dateChargement, CONSTANTS.INACTIF, INFORMATION_DATA FROM TABLE(v_exist)'||CHR(10)
||' WHERE ACTIF = CONSTANTS.ACTIF AND ID_NUM = rec_ex_min_new.ID_NUM AND INFORMATION_DATA=rec_ex_min_new.INFORMATION_DATA);'||CHR(10)
||' cpt_rows := cpt_rows + SQL%ROWCOUNT;'||CHR(10)
||' EXCEPTION'||CHR(10)
||' WHEN DUP_VAL_ON_INDEX THEN '||CHR(10)
||' IF plog.iswarnenabled(pCTX) THEN plog.warn(pCTX, ''DUPVAL_ON_INDEX => ''||rec_ex_min_new.ID_NUM); END IF;'||CHR(10)
||' RAISE;'||CHR(10)
||' WHEN OTHERS THEN '||CHR(10)
||' IF plog.iserrorenabled(pCTX) THEN plog.error(pCTX, TO_CHAR(:p_dateChargement, CONSTANTS.FMT_DATE)||chr(9)||rec_ex_min_new.ID_NUM||chr(9)||rec_ex_min_new.INFORMATION_DATA); END IF;'||CHR(10)
||' RAISE;'||CHR(10)
||' END;'||CHR(10)
||' END LOOP;'||CHR(10)
||' IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, ''NB_LIGNES INSERTED for EXIST MINUS NEW (ACTIF=0): ''||cpt_rows); END IF;'||CHR(10)
||' IF plog.isdebugenabled(pCTX) THEN plog.debug(pCTX, ''DEBUT NEW MINUS EXIST''); END IF;'||CHR(10)
||' cpt_rows := 0;'||CHR(10)
||' FOR rec_new_min_ex IN ('||CHR(10)
||' SELECT ID_NUM, INFORMATION_DATA FROM TABLE(p_newInformation)'||CHR(10)
||' MINUS'||CHR(10)
||' SELECT ID_NUM, INFORMATION_DATA FROM TABLE(v_exist) WHERE ACTIF = CONSTANTS.ACTIF)'||CHR(10)
||' LOOP'||CHR(10)
||' BEGIN'||CHR(10)
||' INSERT INTO RIBICS'||p_codeInformation||'(ID_NUM, DINFOIDENTITE, DINFOIDENTITE_INFORMATION, ACTIF, INFORMATION_DATA)'||CHR(10)
||' (SELECT ID_NUM, :p_dateChargement, :p_dateChargement, CONSTANTS.ACTIF, INFORMATION_DATA FROM TABLE(p_newInformation) WHERE ID_NUM = rec_new_min_ex.ID_NUM AND INFORMATION_DATA=rec_new_min_ex.INFORMATION_DATA);'||CHR(10)
||' cpt_rows := cpt_rows + SQL%ROWCOUNT;'||CHR(10)
||' EXCEPTION'||CHR(10)
||' WHEN DUP_VAL_ON_INDEX THEN '||CHR(10)
||' IF plog.iswarnenabled(pCTX) THEN plog.warn(pCTX, ''DUPVAL_ON_INDEX => ''||rec_new_min_ex.ID_NUM); END IF;'||CHR(10)
||' RAISE;'||CHR(10)
||' WHEN OTHERS THEN '||CHR(10)
||' IF plog.iserrorenabled(pCTX) THEN plog.error(pCTX, TO_CHAR(:p_dateChargement, CONSTANTS.FMT_DATE)||chr(9)||rec_new_min_ex.ID_NUM||chr(9)||rec_new_min_ex.INFORMATION_DATA); END IF;'||CHR(10)
||' RAISE;'||CHR(10)
||' END;'||CHR(10)
||' END LOOP;'||CHR(10)
||' IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, ''NB_LIGNES INSERTED for NEW MINUS EXIST (ACTIF=1): ''||cpt_rows); END IF;'||CHR(10)
||' EXCEPTION'||CHR(10)
||' WHEN OTHERS THEN '||CHR(10)
||' -- TODO'||CHR(10)
||' plog.error();'||CHR(10)
||' RAISE;'||CHR(10)
||' END;'||CHR(10);
IF plog.isdebugenabled(pCTX) THEN
-- v_sqlSttLog := v_sqlStt;
-- WHILE LENGTH(v_sqlSttLog)>0 LOOP
-- plog.debug(pCTX, SUBSTR(v_sqlSttLog,1,2000));
-- v_sqlSttLog := SUBSTR(v_sqlSttLog,2001);
-- END LOOP;
plog.debug(pCTX, v_sqlStt);
END IF;
-- Attention a l'ordre des bind variables.
EXECUTE IMMEDIATE v_sqlStt USING p_newInformation,p_dateChargement;
EXCEPTION
WHEN DUP_VAL_ON_INDEX THEN
IF plog.iserrorenabled(pCTX) THEN
plog.ERROR();
END IF;
RAISE;
WHEN OTHERS THEN
IF plog.iserrorenabled(pCTX) THEN
-- plog.ERROR(pCTX, v_sqlStt);
-- v_sqlSttLog := v_sqlStt;
-- WHILE LENGTH(v_sqlSttLog)>0 LOOP
-- plog.error(pCTX, SUBSTR(v_sqlSttLog,1,2000));
-- v_sqlSttLog := SUBSTR(v_sqlSttLog,2001);
-- END LOOP;
plog.ERROR();
END IF;
RAISE;
END TRAITE_INFORMATION;
END RIBICSZY;
/
[Updated on: Fri, 22 July 2011 03:02] Report message to a moderator
|
|
|
Re: DBMS_SCHEDULER.JOB => How to commit only once [message #520503 is a reply to message #517052] |
Tue, 23 August 2011 03:13   |
 |
ric90
Messages: 42 Registered: May 2011 Location: Belfort
|
Member |
|
|
Hi all,
Just to tell you that the solution posted above woks fine.
The only problem is the time lost in the management of the thred pool.
On my implementation, i need to know if all threads have been done successfully to go to the next step.
The management of thread pool take 3/4 of the time in the process.
I'm looking for another solution, maybe using CHAIN in the DBMS_SCHEDULER or EVENTS to avoid this cursor :
WHILE num_thread IS NOT NULL LOOP
--tab_codeInformation.FIRST..tab_codeInformation.LAST
SELECT count(1)
INTO v_logId
FROM USER_SCHEDULER_JOB_RUN_DETAILS
WHERE JOB_NAME = thread_pool(num_thread)
AND STATUS <> SUCCESS_THREAD ;
IF v_logId > 0 THEN
RAISE_APPLICATION_ERROR(-20100, 'SOME THREADS FAILED');
END IF;
num_thread := thread_pool.NEXT(num_thread);
END LOOP;
I will open a new thread for this question.
[Updated on: Tue, 23 August 2011 03:14] Report message to a moderator
|
|
|
Re: DBMS_SCHEDULER.JOB => How to commit only once [message #521467 is a reply to message #520503] |
Tue, 30 August 2011 03:23   |
 |
ric90
Messages: 42 Registered: May 2011 Location: Belfort
|
Member |
|
|
Hi there,
Juste few information always about my simili-threads.
I made many tests using chains, but the result is not preforming high speed treatment.
I made new jobs to know if i have to commit or rollback my several jobs transactions with events specifications.
It runs very well except one thing.
For the COMMIT, right. I made this :
-- COMMIT
v_sqlStt := 'DECLARE'||CHR(10)
||' NOM_TRT VARCHAR2(50) := ''LOAD_ZY_MATCLE_'|| p_avantDernierMatcle||''';'||CHR(10)
||' pCTX PLOG.LOG_CTX ;'||CHR(10)
||' l_start NUMBER := '|| l_start ||';'||CHR(10)
||' p_matcle VARCHAR2(12) := ''' || tab_matcle(j) ||''';'||CHR(10)
||' p_avantderniermatcle NUMBER := ' || p_avantderniermatcle || ';'||CHR(10)
||' p_dateChargement TIMESTAMP := TO_TIMESTAMP('''||TO_CHAR(p_dateChargement, CONSTANTS.FMT_DATE)||''', RRG.CONSTANTS.FMT_DATE); '||CHR(10)
||' BEGIN'||CHR(10)
||' pCTX:= PLOG.init(NOM_TRT,RRG.RRGQAZY.LOG_LEVEL);'||CHR(10)
||' FOR i IN RRG.RRGQAZY.tab_codeInformation.FIRST..RRG.RRGQAZY.tab_codeInformation.LAST LOOP'||CHR(10)
||' EXECUTE IMMEDIATE '||CHR(10)
||' ''DELETE FROM RRGQT''||RRG.RRGQAZY.tab_codeInformation(i)||'' WHERE MATRICULE_RH = :1'||CHR(10)
||' AND AVANT_DERNIER_MATCLE = :2'||CHR(10)
||' AND DINFOIDENTITE = :3'' USING p_matcle, p_avantderniermatcle, p_dateChargement;'||CHR(10)
||' END LOOP;'||CHR(10)
||' COMMIT;'||CHR(10)
||' IF plog.iserrorenabled(pCTX) THEN plog.error(pCTX, ''TRAITEMENT DU DOSSIER INCOMPLET <'||tab_matcle(j)||'>''||chr(9)||''DUREE (hsec) <''||(DBMS_UTILITY.get_time - l_start)||''>''); END IF;'||CHR(10)
||' END;';
v_commit_job_name := dbms_scheduler.generate_job_name('SUCCESS_'||tab_matcle(j)||'_');
IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, v_commit_job_name || ' : ' || v_commit_condition); END IF;
DBMS_SCHEDULER.CREATE_JOB (
job_name => v_commit_job_name,
job_type => 'PLSQL_BLOCK',
job_action => v_sqlStt,
event_condition => v_commit_condition,
queue_spec => 'SYS.SCHEDULER$_EVENT_QUEUE, streams_queue_agent',
auto_drop => TRUE,
enabled => FALSE,
comments => 'SUCCESS_'||tab_matcle(j)||'_');
DBMS_SCHEDULER.set_attribute(
name => v_commit_job_name,
ATTRIBUTE => 'raise_events',
VALUE => DBMS_SCHEDULER.JOB_RUN_COMPLETED );
DBMS_SCHEDULER.ENABLE(v_commit_job_name);
My commit condition is generated with a table of job_names which look like this one :
20110830 09:39:38 INFO RRGQAZY.LOAD_ZY_MATCLE_4 SUCCESS_10061741_503623 : (tab.user_data.object_name = 'P_10061741_ZYAF_503589' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZYAG_503590' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZYCA_503591' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZYCO_503592' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZYDI_503593' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZYEC_503594' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZYES_503595' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZYSB_503596' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZYTL_503597' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZYU5_503598' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZYVE_503599' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZYWA_503600' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZYWB_503601' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZYWC_503602' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZYW0_503603' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZY00_503604' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZY01_503605' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZY05_503606' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZY06_503607' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZY07_503608' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZY1S_503609' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZY1V_503610' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZY10_503611' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZY11_503612' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZY12_503613' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZY19_503614' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZY3B_503615' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZY3C_503616' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZY3G_503617' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZY3Y_503618' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZY38_503619' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZY4I_503620' AND tab.user_data.event_type = 'JOB_SUCCEEDED') AND (tab.user_data.object_name = 'P_10061741_ZY4K_503621' AND tab.user_data.event_type = 'JOB_SUCCEEDED')
And this one works very well.
For my rollback, i made this :
-- ROLLBACK
v_sqlStt := 'DECLARE'||CHR(10)
||' NOM_TRT VARCHAR2(50) := ''LOAD_ZY_MATCLE_'|| p_avantDernierMatcle||''';'||CHR(10)
||' pCTX PLOG.LOG_CTX ;'||CHR(10)
||' l_start NUMBER := '|| l_start ||';'||CHR(10)
||' BEGIN'||CHR(10)
||' pCTX:= PLOG.init(NOM_TRT,RRG.RRGQAZY.LOG_LEVEL);'||CHR(10)
||' DELETE RRGQTZY_W'||p_avantderniermatcle||' A WHERE A.DATE_TRAITEMENT = TO_TIMESTAMP('''||TO_CHAR(p_dateChargement, CONSTANTS.FMT_DATE)||''', RRG.CONSTANTS.FMT_DATE) AND A.MATRICULE_RH = '''||tab_matcle(j)||''';'||CHR(10)
||' COMMIT;'||CHR(10)
||' IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, ''FIN TRAITEMENT <'||tab_matcle(j)||'>''||chr(9)||''DUREE (hsec) <''||(DBMS_UTILITY.get_time - l_start)||''>''); END IF;'||CHR(10)
||' END;';
v_rollback_job_name := dbms_scheduler.generate_job_name('ROLLBACK_'||tab_matcle(j)||'_');
IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, v_rollback_job_name || ' : ' || v_rollback_condition); END IF;
DBMS_SCHEDULER.CREATE_JOB (
job_name => v_rollback_job_name,
job_type => 'PLSQL_BLOCK',
job_action => v_sqlStt,
event_condition => v_rollback_condition,
queue_spec => 'SYS.SCHEDULER$_EVENT_QUEUE, streams_queue_agent',
auto_drop => TRUE,
enabled => FALSE,
comments => 'ROLLBACK_'||tab_matcle(j)||'_');
DBMS_SCHEDULER.set_attribute(
name => v_rollback_job_name,
ATTRIBUTE => 'raise_events',
VALUE => DBMS_SCHEDULER.JOB_RUN_COMPLETED );
DBMS_SCHEDULER.ENABLE(v_rollback_job_name);
And the v_rollback_condition looks like this one :
20110830 09:39:38 INFO RRGQAZY.LOAD_ZY_MATCLE_4 ROLLBACK_10061741_503622 : (tab.user_data.object_name = 'P_10061741_ZYAF_503589' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZYAG_503590' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZYCA_503591' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZYCO_503592' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZYDI_503593' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZYEC_503594' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZYES_503595' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZYSB_503596' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZYTL_503597' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZYU5_503598' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZYVE_503599' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZYWA_503600' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZYWB_503601' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZYWC_503602' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZYW0_503603' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZY00_503604' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZY01_503605' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZY05_503606' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZY06_503607' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZY07_503608' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZY1S_503609' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZY1V_503610' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZY10_503611' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZY11_503612' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZY12_503613' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZY19_503614' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZY3B_503615' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZY3C_503616' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZY3G_503617' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZY3Y_503618' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZY38_503619' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZY4I_503620' AND tab.user_data.event_type = 'JOB_FAILED') OR (tab.user_data.object_name = 'P_10061741_ZY4K_503621' AND tab.user_data.event_type = 'JOB_FAILED')
The only problem, is that Oracle lanuch the jobs as many time as the conditions matches. So, if i have 17 jobs which fails, Oracle lauch my Rollback job 17 times.
How can i wrote my v_rollback_condition to tell Oracle to lauch only once this jobs ? Or can i tell to the job to be launch only once ?
Thank you in advance.
|
|
|
|
Re: DBMS_SCHEDULER.JOB => How to commit only once [message #521495 is a reply to message #521473] |
Tue, 30 August 2011 10:15  |
 |
ric90
Messages: 42 Registered: May 2011 Location: Belfort
|
Member |
|
|
Here is the result of my research, once i put my jobs enabled, in order to know if i have to commit or rollback my transactions, i made this
DECLARE
-- TYPE t_queue_msq IS TABLE OF sys.scheduler$_event_info;
l_dequeue_options DBMS_AQ.dequeue_options_t;
l_message_properties DBMS_AQ.message_properties_array_t;
l_message_handle DBMS_AQ.msgid_array_t;
l_queue_msg t_queue_msq;
l_queue_msg_full t_queue_msq;
l_array_size pls_integer := thread_pool.count;
l_error DBMS_AQ.error_array_t;
retval PLS_INTEGER;
nb_succeed pls_integer := 0;
nb_failed pls_integer := 0;
BEGIN
l_dequeue_options.consumer_name := 'RRG';
l_dequeue_options.deq_condition := 'tab.user_data.object_name IN (';
FOR k IN thread_pool.FIRST..thread_pool.LAST LOOP
l_dequeue_options.deq_condition := l_dequeue_options.deq_condition ||'''' || thread_pool(k)||'''';
IF k != thread_pool.LAST THEN
l_dequeue_options.deq_condition := l_dequeue_options.deq_condition || ',';
ELSE
l_dequeue_options.deq_condition := l_dequeue_options.deq_condition || ')';
END IF;
END LOOP;
IF plog.isdebugenabled(pCTX) THEN plog.debug(pCTX, 'l_dequeue_options.deq_condition : ' || l_dequeue_options.deq_condition); END IF;
retval := DBMS_AQ.DEQUEUE_ARRAY (queue_name => 'SYS.SCHEDULER$_EVENT_QUEUE',
dequeue_options => l_dequeue_options,
array_size => l_array_size,
message_properties_array => l_message_properties,
payload_array => l_queue_msg_full,
msgid_array => l_message_handle,
error_array => l_error);
IF plog.isdebugenabled(pCTX) THEN plog.debug(pCTX, 'retval : ' || retval); END IF;
-- Tant que tous les threads ne sont pas finis
WHILE retval < thread_pool.COUNT LOOP
retval := retval + DBMS_AQ.DEQUEUE_ARRAY (queue_name => 'SYS.SCHEDULER$_EVENT_QUEUE',
dequeue_options => l_dequeue_options,
array_size => l_array_size,
message_properties_array => l_message_properties,
payload_array => l_queue_msg,
msgid_array => l_message_handle,
error_array => l_error);
IF plog.isdebugenabled(pCTX) THEN plog.info(pCTX, 'retval dans WHILE : ' || retval); END IF;
IF retval > l_queue_msg_full.LAST THEN
FOR z IN l_queue_msg.FIRST..l_queue_msg.LAST LOOP
l_queue_msg_full.extend;
l_queue_msg_full(l_queue_msg_full.LAST):=l_queue_msg(z);
END LOOP;
END IF;
END LOOP;
FOR z IN 1..retval LOOP
IF plog.isdebugenabled(pCTX) THEN plog.debug(pCTX, 'event_type : ' || l_queue_msg_full(z).event_type); END IF;
IF plog.isdebugenabled(pCTX) THEN plog.debug(pCTX, 'object_owner : ' || l_queue_msg_full(z).object_owner); END IF;
IF plog.isdebugenabled(pCTX) THEN plog.debug(pCTX, 'object_name : ' || l_queue_msg_full(z).object_name); END IF;
IF plog.isdebugenabled(pCTX) THEN plog.debug(pCTX, 'event_timestamp: ' || l_queue_msg_full(z).event_timestamp); END IF;
IF plog.isdebugenabled(pCTX) THEN plog.debug(pCTX, 'error_code : ' || l_queue_msg_full(z).error_code); END IF;
IF plog.isdebugenabled(pCTX) THEN plog.debug(pCTX, 'event_status : ' || l_queue_msg_full(z).event_status); END IF;
IF plog.isdebugenabled(pCTX) THEN plog.debug(pCTX, 'log_id : ' || l_queue_msg_full(z).log_id); END IF;
IF plog.isdebugenabled(pCTX) THEN plog.debug(pCTX, 'run_count : ' || l_queue_msg_full(z).run_count); END IF;
IF plog.isdebugenabled(pCTX) THEN plog.debug(pCTX, 'failure_count : ' || l_queue_msg_full(z).failure_count); END IF;
IF plog.isdebugenabled(pCTX) THEN plog.debug(pCTX, 'retry_count : ' || l_queue_msg_full(z).retry_count); END IF;
IF l_queue_msg_full(z).event_type = 'JOB_SUCCEEDED' THEN
nb_succeed := nb_succeed + 1 ;
ELSIF l_queue_msg_full(z).event_type = 'JOB_FAILED' THEN
nb_failed := nb_failed + 1 ;
END IF;
END LOOP;
IF nb_failed > 0 THEN
-- ROLLBACK
v_sqlStt := 'DECLARE'||CHR(10)
||' NOM_TRT VARCHAR2(50) := ''LOAD_ZY_MATCLE_'|| p_avantDernierMatcle||''';'||CHR(10)
||' pCTX PLOG.LOG_CTX ;'||CHR(10)
||' l_start NUMBER := '|| l_start ||';'||CHR(10)
||' p_matcle VARCHAR2(12) := ''' || tab_matcle(j) ||''';'||CHR(10)
||' p_avantderniermatcle NUMBER := ' || p_avantderniermatcle || ';'||CHR(10)
||' p_dateChargement TIMESTAMP := TO_TIMESTAMP('''||TO_CHAR(p_dateChargement, CONSTANTS.FMT_DATE)||''', RRG.CONSTANTS.FMT_DATE); '||CHR(10)
||' BEGIN'||CHR(10)
||' pCTX:= PLOG.init(NOM_TRT,RRG.RRGQAZY.LOG_LEVEL);'||CHR(10)
||' FOR i IN RRG.RRGQAZY.tab_codeInformation.FIRST..RRG.RRGQAZY.tab_codeInformation.LAST LOOP'||CHR(10)
||' EXECUTE IMMEDIATE '||CHR(10)
||' ''DELETE FROM RRGQT''||RRG.RRGQAZY.tab_codeInformation(i)||'' WHERE MATRICULE_RH = :1'||CHR(10)
||' AND AVANT_DERNIER_MATCLE = :2'||CHR(10)
||' AND DINFOIDENTITE = :3'' USING p_matcle, p_avantderniermatcle, p_dateChargement;'||CHR(10)
||' END LOOP;'||CHR(10)
||' COMMIT;'||CHR(10)
||' IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, '' ===> ROLLBACK : TRAITEMENT DU DOSSIER INCOMPLET <'||tab_matcle(j)||'>''||chr(9)||''DUREE (hsec) <''||(DBMS_UTILITY.get_time - l_start)||''>''); END IF;'||CHR(10)
||' END;';
v_rollback_job_name := dbms_scheduler.generate_job_name('ROLLBACK_'||tab_matcle(j)||'_');
IF plog.isdebugenabled(pCTX) THEN plog.debug(pCTX, v_rollback_job_name || ' : ' || v_rollback_condition); END IF;
DBMS_SCHEDULER.CREATE_JOB (
job_name => v_rollback_job_name,
job_type => 'PLSQL_BLOCK',
job_action => v_sqlStt,
auto_drop => TRUE,
enabled => FALSE,
comments => 'ROLLBACK_'||tab_matcle(j)||'_');
DBMS_SCHEDULER.set_attribute(
name => v_rollback_job_name,
ATTRIBUTE => 'raise_events',
VALUE => DBMS_SCHEDULER.JOB_RUN_COMPLETED );
DBMS_SCHEDULER.ENABLE(v_rollback_job_name);
ELSIF nb_succeed = thread_pool.COUNT THEN
-- COMMIT
v_sqlStt := 'DECLARE'||CHR(10)
||' NOM_TRT VARCHAR2(50) := ''LOAD_ZY_MATCLE_'|| p_avantDernierMatcle||''';'||CHR(10)
||' pCTX PLOG.LOG_CTX ;'||CHR(10)
||' l_start NUMBER := '|| l_start ||';'||CHR(10)
||' BEGIN'||CHR(10)
||' pCTX:= PLOG.init(NOM_TRT,RRG.RRGQAZY.LOG_LEVEL);'||CHR(10)
||' DELETE RRGQTZY_W'||p_avantderniermatcle||' A WHERE A.DATE_TRAITEMENT = TO_TIMESTAMP('''||TO_CHAR(p_dateChargement, CONSTANTS.FMT_DATE)||''', RRG.CONSTANTS.FMT_DATE) AND A.MATRICULE_RH = '''||tab_matcle(j)||''';'||CHR(10)
||' COMMIT;'||CHR(10)
||' IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, '' ==> COMMIT : FIN TRAITEMENT <'||tab_matcle(j)||'>''||chr(9)||''DUREE (hsec) <''||(DBMS_UTILITY.get_time - l_start)||''>''); END IF;'||CHR(10)
||' END;';
v_commit_job_name := dbms_scheduler.generate_job_name('SUCCESS_'||tab_matcle(j)||'_');
IF plog.isdebugenabled(pCTX) THEN plog.debug(pCTX, v_commit_job_name || ' : ' || v_commit_condition); END IF;
DBMS_SCHEDULER.CREATE_JOB (
job_name => v_commit_job_name,
job_type => 'PLSQL_BLOCK',
job_action => v_sqlStt,
auto_drop => TRUE,
enabled => FALSE,
comments => 'SUCCESS_'||tab_matcle(j)||'_');
DBMS_SCHEDULER.set_attribute(
name => v_commit_job_name,
ATTRIBUTE => 'raise_events',
VALUE => DBMS_SCHEDULER.JOB_RUN_COMPLETED );
DBMS_SCHEDULER.ENABLE(v_commit_job_name);
END IF;
END;
-- Ce bloc permet de dépiler la queue des events.
-- On cherche le run_completed du ROLLBACK ou du COMMIT pour passer au dossier suivant.
DECLARE
l_dequeue_options DBMS_AQ.dequeue_options_t;
l_message_properties DBMS_AQ.message_properties_t;
l_message_handle RAW(16);
l_queue_msg sys.scheduler$_event_info;
BEGIN
l_dequeue_options.consumer_name := 'RRG';
l_dequeue_options.deq_condition := 'tab.user_data.object_name IN ('''|| v_commit_job_name ||''','''|| v_rollback_job_name ||''') AND (tab.user_data.event_type = ''JOB_FAILED'' OR tab.user_data.event_type = ''JOB_SUCCEEDED'')';
DBMS_AQ.dequeue(queue_name => 'SYS.SCHEDULER$_EVENT_QUEUE',
dequeue_options => l_dequeue_options,
message_properties => l_message_properties,
payload => l_queue_msg,
msgid => l_message_handle);
IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, 'event_type : ' || l_queue_msg.event_type); END IF;
IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, 'object_owner : ' || l_queue_msg.object_owner); END IF;
IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, 'object_name : ' || l_queue_msg.object_name); END IF;
IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, 'event_timestamp: ' || l_queue_msg.event_timestamp); END IF;
IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, 'error_code : ' || l_queue_msg.error_code); END IF;
IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, 'event_status : ' || l_queue_msg.event_status); END IF;
IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, 'log_id : ' || l_queue_msg.log_id); END IF;
IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, 'run_count : ' || l_queue_msg.run_count); END IF;
IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, 'failure_count : ' || l_queue_msg.failure_count); END IF;
IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, 'retry_count : ' || l_queue_msg.retry_count); END IF;
END;
IF plog.isinfoenabled(pCTX) THEN plog.info(pCTX, ' ==> TEMPS TOTAL DU TRAITEMENT DU DOSSIER <'||tab_matcle(j)||'>'||chr(9)||'DUREE (hsec) <'||(DBMS_UTILITY.get_time - l_start)||'>'); END IF;
END IF;
And it runs very well....
I have a treatment duration between 0.6 and 0.8 sec YEAH!!!
DBMS_SCHEDULER power !
|
|
|
Goto Forum:
Current Time: Sat Aug 23 13:16:26 CDT 2025
|