We have discussed using Service Broker as a task scheduler to read, load, and execute tasks from a queue. The issue of this approach is that the scheduler has to constantly searching the queue to find an available task and then load and execute it. While there is nothing in the queue, Service Brocker will have to perform the same operation but for nothing, in which it adds extra unnecessaryload to the system. Why not use Service Broker to perfrom functions of scheduler, queue, and executor?

Yes, scheduling tasks use Service Broker only can remove extra coding and overhead of queue querying since Service Broker already has the component called Queue. When there is an asynchronous request (query task), the request will be sent to a taget service broker queue. The activation procedure will pick up the task and execute it. After the execution, the activation procedure at the target will end the conversation. Once end of conversation is called, an end conversation message will be sent to the initiator to inform the initiator to complete the conversation.
Now let’s prepare the work space

use master
if DB_ID('test') is not null
begin
	alter database test set single_user with rollback immediate
	drop  database test
end
create database test
alter database test set enable_broker
go

Then let’s create execution log and wrapper

go
create table ExecutionLog
(
	ExecutinLogID int identity(1,1) primary key,
	SQL nvarchar(max),
	StartTime datetime default(getdate()),
	EndTime datetime
)
go
create procedure ExecSQL(@SQL nvarchar(max))
as
begin
	set nocount on
	declare @ExecutionLogID int
	insert into ExecutionLog (SQL)
		values(@SQL)
	select @ExecutionLogID = SCOPE_IDENTITY();
	begin try
	exec(@SQL)
	end try
	begin catch
		--- error handling
	end catch
	update ExecutionLog 
		set EndTime = GETDATE()
	where ExecutinLogID = @ExecutionLogID
end

As we discussed at beginning, the activation procedure should read a task from the queue, execute the task and the end the conversation

create procedure ActivationTarget
as
begin
	declare @Handle uniqueidentifier, @Type sysname, @msg nvarchar(max)
	waitfor(
		Receive top (1)
			@Handle = conversation_handle,
			@Type = message_type_name,
			@msg = message_body 
		from QueueTarget
	), timeout 5000
	if @Handle is null
		return;
	if @Type = 'DEFAULT' and @msg is not null
		exec ExecSQL @msg
	end conversation @Handle;
end
go

Create Service Broker structure using default contract:

create queue QueueTarget 
	with status = on, 
	activation(
				status = on, 
				procedure_name = ActivationTarget, 
				max_queue_readers = 5, 
				execute as self
			)
go
create service ServiceTarget on queue QueueTarget([DEFAULT])
go
create queue QueueInitiator 
go
create service ServiceInitiator on queue QueueInitiator([DEFAULT])
go

Create a wrap-up procedure for asynchronous call

create procedure ExecAsync (@SQL nvarchar(max), @handle uniqueidentifier = null output)
as
begin
	select @handle = null;
	if rtrim(isnull(@SQL, '')) = ''
		return;
	begin dialog conversation @handle
	from service ServiceInitiator
	to service 'ServiceTarget'
	on contract [DEFAULT]
	with encryption = off;
	send on conversation @handle message type [DEFAULT](@SQL)
	return
end

Now let’s perform a test which includes 2 piece of code. The first is to invoke 10 asyncronous call and then save the conversation handle to a temp table for synchronization purpose. The second code snippet is to synchronize those 10 invocations.

--test
if object_id('tempdb..#handles') is not null
	drop table #handles
create table #handles (handle uniqueidentifier primary key)
go
declare @SQL nvarchar(max) = 'waitfor delay ''00:00:10'''
declare @i int = 1, @handle uniqueidentifier
while @i<=10
begin
	exec ExecAsync @SQL, @handle output
	insert into #handles values(@handle)
	select @i +=1
end
go
--synchronization
declare @handle uniqueidentifier
while exists(select * from #handles)
begin
	waitfor(
		receive top (1) 
				@handle = conversation_handle 
		from QueueInitiator
	), timeout -1;
	end conversation @handle;
	delete #handles where handle = @handle
end

The test script will return in 40 seconds which is way faster comparing to running them sequentially.

select * from ExecutionLog 
select datediff(second, min(StartTime), max(EndTime))  from ExecutionLog 
/*
ExecutinLogID SQL                       StartTime               EndTime
------------- ------------------------- ----------------------- -----------------------
1             waitfor delay '00:00:10'  2012-01-23 10:38:35.210 2012-01-23 10:38:45.213
2             waitfor delay '00:00:10'  2012-01-23 10:38:40.913 2012-01-23 10:38:50.917
3             waitfor delay '00:00:10'  2012-01-23 10:38:45.220 2012-01-23 10:38:55.220
4             waitfor delay '00:00:10'  2012-01-23 10:38:45.230 2012-01-23 10:38:55.230
5             waitfor delay '00:00:10'  2012-01-23 10:38:50.913 2012-01-23 10:39:00.917
6             waitfor delay '00:00:10'  2012-01-23 10:38:50.920 2012-01-23 10:39:00.920
7             waitfor delay '00:00:10'  2012-01-23 10:38:55.227 2012-01-23 10:39:05.227
8             waitfor delay '00:00:10'  2012-01-23 10:38:55.233 2012-01-23 10:39:05.237
9             waitfor delay '00:00:10'  2012-01-23 10:38:55.237 2012-01-23 10:39:05.237
10            waitfor delay '00:00:10'  2012-01-23 10:39:00.920 2012-01-23 10:39:10.920

(10 row(s) affected)


-----------
35

(1 row(s) affected)

*/

Alternatively, you can also put activation procedure on initiator queue if you don’t care about the synchronization.

create procedure ActivationInitiator
as
begin
	declare @Handle uniqueidentifier;
	waitfor(
		Receive top (1)
			@Handle = conversation_handle
		from QueueInitiator
	), timeout 50
	if @Handle is null
		return;
	end conversation @Handle;
end
go
alter queue QueueInitiator 
	with status = on, 
	activation(
				status = on, 
				procedure_name = ActivationInitiator, 
				max_queue_readers = 1, 
				execute as self
			)
go

Finally, let’s run a query to check number of conversations in the conversation endpoint.

select COUNT(*) from sys.conversation_endpoints
/*
-----------
10

(1 row(s) affected)
*/

It returns 10. Why are there still 10 conversations in the conversation endpoint even all of them are completed (ended)? This is for security reason. An ended conversation will remain in the conversation endpoint for 30 minutes before they are cleaned up. If you have a lot of tasks to be scheduled during the day, you may suffer from performance issue due to that 30 minutes ended conversation retention period. In this case, you may think of using only one conversation for all tasks. Then your synchronization strategy needs to be changed. What if you need running tasks in a complex ordering? I will provide example code in next post of this series to result those issues.

Parallel Task Scheduling (5) – Task Conveyer

You May Also Like

Leave a Reply

Your email address will not be published. Required fields are marked *

C# | HTML | Plain Text | SQL | XHTML | XML | XSLT |

This site uses Akismet to reduce spam. Learn how your comment data is processed.