自定义流复制冲突的处理过程
/*以下是接到一个冲突后,把冲突记录到changeddata表中,在目标数据库上*/
-- Create table
create table CHANGEDDATA
(
LOCAL_TRANSACTION_ID VARCHAR2(32) not null,
SOURCE_DATABASE VARCHAR2(128),
COMMAND_TYPE VARCHAR2(16),
OBJECT_NAME VARCHAR2(64),
OLD_VALUES VARCHAR2(4000),
NEW_VALUES VARCHAR2(4000),
CURRENT_DATE DATE,
constraint pk_changeddata primary key(LOCAL_TRANSACTION_ID)
);
CREATE OR REPLACE PACKAGE pkg_handle AS
TYPE emsg_array IS TABLE OF VARCHAR2(2000) INDEX BY BINARY_INTEGER;
function f_getdata(data anydata) return varchar2 ;
PROCEDURE p_changeddata(message IN ANYDATA,
error_stack_depth IN NUMBER,
error_numbers IN DBMS_UTILITY.NUMBER_ARRAY,
error_messages IN EMSG_ARRAY);
END;
/
CREATE OR REPLACE PACKAGE BODY pkg_handle AS
function f_getdata(data anydata) return varchar2 is
/*
功能:把anydata类型转换成vachar2类型
*/
vs_str varchar2(4000);
vi_num number;
vd_date date;
vs_typename varchar2(61);
vi_res number;
begin
if data is null then
return '';
end if;
vs_typename := lower(data.gettypename());
if vs_typename = 'sys.varchar2' then
vi_res := data.getvarchar2(vs_str);
elsif vs_typename = 'sys.char' then
vi_res := data.getchar(vs_str);
elsif vs_typename = 'sys.varchar' then
vi_res := data.getvarchar(vs_str);
elsif vs_typename = 'sys.number' then
vi_res := data.getnumber(vi_num);
vs_str := to_char(vi_num);
elsif vs_typename = 'sys.date' then
vi_res := data.getdate(vd_date);
vs_str := to_char(vd_date, 'yyyy-mm-dd hh24:mi:ss');
else
vs_str := '';
end if;
vs_str := substr(vs_str, 1, 256);
return vs_str;
end;
procedure p_changeddata(message IN ANYDATA,
error_stack_depth IN NUMBER,
error_numbers IN DBMS_UTILITY.NUMBER_ARRAY,
error_messages IN emsg_array) is
/*
功能:插入strmerrinfo表中流复制所有变更信息
*/
vs_cmdtype varchar2(32); --dml类型,如insert,update,delete
vs_typename varchar2(256);
vlcr_row sys.lcr$_row_record;
vi_res number;
vs_txnid varchar2(30);
vs_source varchar2(128);
oldlist sys.lcr$_row_list;
newlist sys.lcr$_row_list;
vbl_old boolean := false;
vbl_new boolean := false;
vs_oldvalues varchar2(4000);
vs_newvalues varchar2(4000);
vd_changedtime date;
vs_object varchar2(64);
begin
/*首先清空表changeddata*/
execute immediate 'truncate table changeddata';
vi_res := message.getobject(vlcr_row);
vs_typename := lower(message.gettypename());
/*只对处理dml修改*/
if vs_typename = 'sys.lcr$_row_record' then
vi_res := message.GetObject(vlcr_row);
vs_txnid := vlcr_row.get_transaction_id;
vs_source := vlcr_row.get_source_database_name;
vs_cmdtype := lower(vlcr_row.get_command_type);
vs_object := vlcr_row.get_object_owner || '.' ||
vlcr_row.get_object_name;
oldlist := vlcr_row.get_values('old');
newlist := vlcr_row.get_values('new', 'N');
vd_changedtime := vlcr_row.get_source_time;
case vs_cmdtype
when 'insert' then
vbl_new := true;
when 'delete' then
vbl_old := true;
when 'update' then
vbl_new := true;
vbl_old := true;
end case;
vs_oldvalues := '';
if vbl_old then
for i in 1 .. oldlist.count loop
if oldlist(i) is not null then
vs_oldvalues := vs_oldvalues || oldlist(i)
.column_name || ':' ||
f_getdata(oldlist(i).data) || ',';
end if;
end loop;
/*去掉最后一个逗号*/
if substr(vs_oldvalues, length(vs_oldvalues), 1) = ',' then
vs_oldvalues := substr(vs_oldvalues, 1, length(vs_oldvalues) - 1);
end if;
end if;
vs_newvalues := '';
if vbl_new then
for i in 1 .. newlist.count loop
if newlist(i) is not null then
vs_newvalues := vs_newvalues || newlist(i)
.column_name || ':' ||
f_getdata(newlist(i).data) || ',';
end if;
end loop;
if substr(vs_newvalues, length(vs_newvalues), 1) = ',' then
vs_newvalues := substr(vs_newvalues, 1, length(vs_newvalues) - 1);
end if;
end if;
insert into changeddata
(local_transaction_id,
source_database,
command_type,
object_name,
old_values,
new_values,
current_date)
values
(vs_txnid,
vs_source,
vs_cmdtype,
vs_object,
vs_oldvalues,
vs_newvalues,
vd_changedtime);
commit;
end if;
exception
when others then
rollback;
end;
end;
/
/*执行冲突处理过程*/
begin
DBMS_APPLY_ADM.SET_DML_HANDLER(
object_name =>'xyz.b', --指定冲突的表
object_type =>'table', --指定类型,只能为table表
operation_name=>'update', --指明冲突类型
error_handler=>true, --ture:只对冲突dml处理 false:对所有的dml进行处理
user_procedure=>'pkg_a.p_changeddata, --指定自定义的冲突解决存储过程
apply_name=>'streams_apply' --指定应用进程,null代表所有
);
end;
/
导入论坛 引用链接 收藏 分享给好友 推荐到圈子 管理 举报
TAG:

