We have discussed using Service Broker as a task scheduler to read, load, and execute tasks from a queue. The issue of this approach is that the scheduler has to constantly searching the queue to find an available task and then load and execute it. While there is nothing in the queue, Service Brocker will have to perform the same operation but for nothing, in which it adds extra unnecessaryload to the system. Why not use Service Broker to perfrom functions of scheduler, queue, and executor?
Yes, scheduling tasks use Service Broker only can remove extra coding and overhead of queue querying since Service Broker already has the component called Queue. When there is an asynchronous request (query task), the request will be sent to a taget service broker queue. The activation procedure will pick up the task and execute it. After the execution, the activation procedure at the target will end the conversation. Once end of conversation is called, an end conversation message will be sent to the initiator to inform the initiator to complete the conversation.
Now let’s prepare the work space
use master if DB_ID('test') is not null begin alter database test set single_user with rollback immediate drop database test end create database test alter database test set enable_broker go
Then let’s create execution log and wrapper
go create table ExecutionLog ( ExecutinLogID int identity(1,1) primary key, SQL nvarchar(max), StartTime datetime default(getdate()), EndTime datetime ) go create procedure ExecSQL(@SQL nvarchar(max)) as begin set nocount on declare @ExecutionLogID int insert into ExecutionLog (SQL) values(@SQL) select @ExecutionLogID = SCOPE_IDENTITY(); begin try exec(@SQL) end try begin catch --- error handling end catch update ExecutionLog set EndTime = GETDATE() where ExecutinLogID = @ExecutionLogID end
As we discussed at beginning, the activation procedure should read a task from the queue, execute the task and the end the conversation
create procedure ActivationTarget as begin declare @Handle uniqueidentifier, @Type sysname, @msg nvarchar(max) waitfor( Receive top (1) @Handle = conversation_handle, @Type = message_type_name, @msg = message_body from QueueTarget ), timeout 5000 if @Handle is null return; if @Type = 'DEFAULT' and @msg is not null exec ExecSQL @msg end conversation @Handle; end go
Create Service Broker structure using default contract:
create queue QueueTarget with status = on, activation( status = on, procedure_name = ActivationTarget, max_queue_readers = 5, execute as self ) go create service ServiceTarget on queue QueueTarget([DEFAULT]) go create queue QueueInitiator go create service ServiceInitiator on queue QueueInitiator([DEFAULT]) go
Create a wrap-up procedure for asynchronous call
create procedure ExecAsync (@SQL nvarchar(max), @handle uniqueidentifier = null output) as begin select @handle = null; if rtrim(isnull(@SQL, '')) = '' return; begin dialog conversation @handle from service ServiceInitiator to service 'ServiceTarget' on contract [DEFAULT] with encryption = off; send on conversation @handle message type [DEFAULT](@SQL) return end
Now let’s perform a test which includes 2 piece of code. The first is to invoke 10 asyncronous call and then save the conversation handle to a temp table for synchronization purpose. The second code snippet is to synchronize those 10 invocations.
--test if object_id('tempdb..#handles') is not null drop table #handles create table #handles (handle uniqueidentifier primary key) go declare @SQL nvarchar(max) = 'waitfor delay ''00:00:10''' declare @i int = 1, @handle uniqueidentifier while @i<=10 begin exec ExecAsync @SQL, @handle output insert into #handles values(@handle) select @i +=1 end go --synchronization declare @handle uniqueidentifier while exists(select * from #handles) begin waitfor( receive top (1) @handle = conversation_handle from QueueInitiator ), timeout -1; end conversation @handle; delete #handles where handle = @handle end
The test script will return in 40 seconds which is way faster comparing to running them sequentially.
select * from ExecutionLog select datediff(second, min(StartTime), max(EndTime)) from ExecutionLog /* ExecutinLogID SQL StartTime EndTime ------------- ------------------------- ----------------------- ----------------------- 1 waitfor delay '00:00:10' 2012-01-23 10:38:35.210 2012-01-23 10:38:45.213 2 waitfor delay '00:00:10' 2012-01-23 10:38:40.913 2012-01-23 10:38:50.917 3 waitfor delay '00:00:10' 2012-01-23 10:38:45.220 2012-01-23 10:38:55.220 4 waitfor delay '00:00:10' 2012-01-23 10:38:45.230 2012-01-23 10:38:55.230 5 waitfor delay '00:00:10' 2012-01-23 10:38:50.913 2012-01-23 10:39:00.917 6 waitfor delay '00:00:10' 2012-01-23 10:38:50.920 2012-01-23 10:39:00.920 7 waitfor delay '00:00:10' 2012-01-23 10:38:55.227 2012-01-23 10:39:05.227 8 waitfor delay '00:00:10' 2012-01-23 10:38:55.233 2012-01-23 10:39:05.237 9 waitfor delay '00:00:10' 2012-01-23 10:38:55.237 2012-01-23 10:39:05.237 10 waitfor delay '00:00:10' 2012-01-23 10:39:00.920 2012-01-23 10:39:10.920 (10 row(s) affected) ----------- 35 (1 row(s) affected) */
Alternatively, you can also put activation procedure on initiator queue if you don’t care about the synchronization.
create procedure ActivationInitiator as begin declare @Handle uniqueidentifier; waitfor( Receive top (1) @Handle = conversation_handle from QueueInitiator ), timeout 50 if @Handle is null return; end conversation @Handle; end go alter queue QueueInitiator with status = on, activation( status = on, procedure_name = ActivationInitiator, max_queue_readers = 1, execute as self ) go
Finally, let’s run a query to check number of conversations in the conversation endpoint.
select COUNT(*) from sys.conversation_endpoints /* ----------- 10 (1 row(s) affected) */
It returns 10. Why are there still 10 conversations in the conversation endpoint even all of them are completed (ended)? This is for security reason. An ended conversation will remain in the conversation endpoint for 30 minutes before they are cleaned up. If you have a lot of tasks to be scheduled during the day, you may suffer from performance issue due to that 30 minutes ended conversation retention period. In this case, you may think of using only one conversation for all tasks. Then your synchronization strategy needs to be changed. What if you need running tasks in a complex ordering? I will provide example code in next post of this series to result those issues.