在 SQL Server 2005中,通过新增的 Service Broker可以实现异步触发器的处理功能。本文提供一种使用 Service Broker实现的通用异步触发器方法。
在本方法中,通过 Service Broker构造异步触发器处理架构,对于要使用这种架构的表,只需要创建相应的触发器及处理触发器中数据的存储过程,并且在异步触发器架构中登记触发器和处理的存储过程即可。如果一个触发器中的数据要被多个表使用,只需要在 dbo.tb_async_trigger_subscribtion中登记相应处理数据的存储过程即可,即一个表的数据变更可以被多个表订阅(使用)。
架构的步骤如下:
需要配置数据库以允许使用 Service Broker。本文以 tempdb库为例,故配置均在 tempdb上下文中进行。
下面的 T-SQL创建异步触发器处理架构相关的对象。
-- ======================================= -- 异步触发器对象 -- 1. service broker 对象 -- ======================================= -- a. message type, 要求使用xml 传递数据 CREATE MESSAGE TYPE MSGT_async_trigger VALIDATION = WELL_FORMED_XML GO -- b. 只需要发送消息 CREATE CONTRACT CNT_async_trigger( MSGT_async_trigger SENT BY INITIATOR) GO -- c. 存储消息的队列 CREATE QUEUE dbo.Q_async_trigger GO -- d. 用于消息处理的服务 CREATE SERVICE SRV_async_trigger ON QUEUE dbo.Q_async_trigger( CNT_async_trigger) GO -- ======================================= -- 异步触发器对象 -- 2. 异步触发器处理的对象 -- ======================================= -- a. 登记异步触发器的表 CREATE TABLE dbo.tb_async_trigger( ID int IDENTITY PRIMARY KEY, table_name sysname, trigger_name sysname ) -- b. 登记订阅异步触发器的存储过程 CREATE TABLE dbo.tb_async_trigger_subscriber( ID int IDENTITY PRIMARY KEY, procedure_name sysname ) -- c. 异步触发器和存储过程之间的订阅关系 CREATE TABLE dbo.tb_async_trigger_subscribtion( trigger_id int REFERENCES dbo.tb_async_trigger( ID), procedure_id int REFERENCES dbo.tb_async_trigger_subscriber( ID), PRIMARY KEY( trigger_id, procedure_id) ) GO -- d. 发送消息的存储过程 CREATE PROC dbo.p_async_trigger_send @message xml AS SET NOCOUNT ON DECLARE @handle uniqueidentifier BEGIN DIALOG CONVERSATION @handle FROM SERVICE [SRV_async_trigger] TO SERVICE N'SRV_async_trigger' ON CONTRACT CNT_async_trigger WITH ENCRYPTION = OFF; SEND ON CONVERSATION @handle MESSAGE TYPE MSGT_async_trigger( @message); -- 消息发出即可, 不需要回复, 因此发出后即可结束会话 END CONVERSATION @handle GO -- e. 处理异步触发器发送的消息 CREATE PROC dbo.p_async_trigger_process AS SET NOCOUNT ON DECLARE @handle uniqueidentifier, @message xml, @rows int SET @rows = 1 WHILE @rows > 0 BEGIN -- 处理已经收到的消息 WAITFOR( RECEIVE TOP(1) @handle = conversation_handle, @message = CASE WHEN message_type_name = N'MSGT_async_trigger' THEN CONVERT(xml, message_body) ELSE NULL END FROM dbo.Q_async_trigger ), TIMEOUT 10 SET @rows = @@ROWCOUNT IF @rows > 0 BEGIN -- 结束会话 END CONVERSATION @handle; -- 处理消息 -- a. 取发送者信息 DECLARE @table_name sysname, @trigger_name sysname, @sql nvarchar(max) SELECT @table_name = @message.value('(/root/table_name)[1]', 'sysname'), @trigger_name = @message.value('(/root/trigger_name)[1]', 'sysname') -- b. 调用异步触发器订阅的存储过程 ;WITH SUB AS( SELECT TR.table_name, TR.trigger_name, SUB.procedure_name FROM dbo.tb_async_trigger TR, dbo.tb_async_trigger_subscriber SUB, dbo.tb_async_trigger_subscribtion TRSUB WHERE TRSUB.trigger_id = TR.ID AND TRSUB.procedure_id = SUB.ID ) SELECT @sql = ( SELECT N' EXEC ' + procedure_name + N' @message ' FROM SUB WHERE table_name = @table_name AND trigger_name = @trigger_name FOR XML PATH(''), ROOT('r'), TYPE ).value('(/r)[1]', 'nvarchar(max)') EXEC sp_executesql @sql, N'@message xml', @message END END GO -- f. 绑定处理的存储过程到队列 ALTER QUEUE dbo.Q_async_trigger WITH ACTIVATION( STATUS = ON, PROCEDURE_NAME = dbo.p_async_trigger_process, MAX_QUEUE_READERS = 10, EXECUTE AS OWNER) GO
下面的 T-SQL演示使用异步触发器构架。示例中创建了三个表:
Dbo.t1
Dbo.t2
Dbo.tb_log
触发器 TR_async_trigger 用于将表 Dbo.t1中的数据变化发送到异步触发器构架中。 dbo.p_Sync_t1_t2和 dbo.p_Record_log用于处理 dbo.t1于中变化的数据。
在处理时,需要把相关的信息登记到异步触发器架构的表中。
-- ======================================= -- 3. 使用示例 -- ======================================= -- =============================== -- 测试对象 -- a. 源表 CREATE TABLE dbo.t1( id int IDENTITY PRIMARY KEY, col int ) -- b. 同步的目的表 CREATE TABLE dbo.t2( id int IDENTITY PRIMARY KEY, col int ) -- c. 记录操作的日志表 CREATE TABLE dbo.tb_log( id int IDENTITY PRIMARY KEY, user_name sysname, operate_type varchar(10), inserted xml, deleted xml ) GO -- a. 异步发送处理消息的触发器 CREATE TRIGGER TR_async_trigger ON dbo.t1 FOR INSERT, UPDATE, DELETE AS IF @@ROWCOUNT = 0 RETURN SET NOCOUNT ON -- 将要发送的数据生成xml 数据 DECLARE @message xml SELECT @message = ( SELECT table_name = ( SELECT TOP 1 OBJECT_NAME(parent_object_id) FROM sys.objects WHERE object_id = @@PROCID), trigger_name = OBJECT_NAME(@@PROCID), user_name = SUSER_SNAME(), inserted = ( SELECT * FROM inserted FOR XML AUTO, TYPE), deleted = ( SELECT * FROM deleted FOR XML AUTO, TYPE) FOR XML PATH(''), ROOT('root'), TYPE ) -- 发送消息 EXEC dbo.p_async_trigger_send @message = @message GO -- b. 处理异步触发器的存储过程 -- b.1 同步到t2 的存储过程 CREATE PROC dbo.p_Sync_t1_t2 @message xml AS SET NOCOUNT ON DECLARE @inserted bit, @deleted bit SELECT @inserted = @message.exist('/root/inserted'), @deleted = @message.exist('/root/deleted') IF @inserted = 1 IF @deleted = 1 -- 更新 BEGIN ;WITH I AS( SELECT id = T.c.value('@id[1]', 'int'), col = T.c.value('@col[1]', 'int') FROM @message.nodes('/root/inserted/inserted') T(c) ), D AS( SELECT id = T.c.value('@id[1]', 'int'), col = T.c.value('@col[1]', 'int') FROM @message.nodes('/root/deleted/deleted') T(c) ) UPDATE A SET col = I.col FROM dbo.t2 A, I, D WHERE A.ID = I.ID AND I.ID = D.ID END ELSE -- 插入 BEGIN SET IDENTITY_INSERT dbo.t2 ON ;WITH I AS( SELECT id = T.c.value('@id[1]', 'int'), col = T.c.value('@col[1]', 'int') FROM @message.nodes('/root/inserted/inserted') T(c) ) INSERT dbo.t2( id, col) SELECT id, col FROM I SET IDENTITY_INSERT dbo.t2 OFF END ELSE -- 删除 BEGIN ;WITH D AS( SELECT id = T.c.value('@id[1]', 'int'), col = T.c.value('@col[1]', 'int') FROM @message.nodes('/root/deleted/deleted') T(c) ) DELETE A FROM dbo.t2 A, D WHERE A.ID = D.ID END GO -- b.2 记录操作记录到dbo.tb_log 的存储过程 CREATE PROC dbo.p_Record_log @message xml AS SET NOCOUNT ON DECLARE @inserted bit, @deleted bit SELECT @inserted = @message.exist('/root/inserted'), @deleted = @message.exist('/root/deleted') INSERT dbo.tb_log( user_name, operate_type, inserted, deleted) SELECT @message.value('(/root/user_name)[1]', 'sysname'), operate_type = CASE WHEN @inserted = 1 AND @deleted = 1 THEN 'update' WHEN @inserted = 1 THEN 'insert' WHEN @deleted = 1 THEN 'delete' END, @message.query('/root/inserted'), @message.query('/root/deleted') GO -- =============================== -- 在异步触发器处理系统中登记对象 INSERT dbo.tb_async_trigger( table_name, trigger_name) VALUES( N't1', N'TR_async_trigger') INSERT dbo.tb_async_trigger_subscriber( procedure_name) SELECT N'dbo.p_Sync_t1_t2' UNION ALL SELECT N'dbo.p_Record_log' INSERT dbo.tb_async_trigger_subscribtion( trigger_id, procedure_id) SELECT 1, 1 UNION ALL SELECT 1, 2 GO
下面的 T-SQL修改表 dbo.t1中的数据,并检查 dbo.t2、 dbo.tb_log中的数据,以确定异步触发器架构的工作是否成功。
执行完成后可以看到 dbo.t2、 dbo.tb_log中有相关的记录。
-- =============================== -- 测试 INSERT dbo.t1 SELECT 1 UNION ALL SELECT 2 UPDATE dbo.t1 SET col = 2 WHERE id = 1 DELETE dbo.t1 WHERE id = 2 -- 显示结果 WAITFOR DELAY '00:00:05' -- 延迟5 分钟, 以便有时间处理消息(因为是异步的) SELECT * FROM dbo.t2 SELECT * FROM dbo.tb_log GO
下面的 T-SQL删除本文中建立的所有对象。
-- ======================================= -- 5. 删除相关的对象 -- ======================================= -- a. 删除service broker 对象 DROP SERVICE SRV_async_trigger DROP QUEUE dbo.Q_async_trigger DROP CONTRACT CNT_async_trigger DROP MESSAGE TYPE MSGT_async_trigger GO -- b. 删除异步触发器处理的相关对象 DROP PROC dbo.p_async_trigger_process DROP PROC dbo.p_async_trigger_send DROP TABLE dbo.tb_async_trigger_subscribtion DROP TABLE dbo.tb_async_trigger_subscriber DROP TABLE dbo.tb_async_trigger GO -- c. 删除测试的对象 DROP TABLE dbo.tb_log, dbo.t1, dbo.t2 DROP PROC dbo.p_Sync_t1_t2, dbo.p_Record_log
已投稿到: |
|
---|