Jan 272012
 

There are always concerns while writing a multi-threading program. In the last post, the considerations are degree of parallelism and tasks starving from getting workers. Max threads for one scheduler can be restricted by applying counting semaphore as I posted there. But it still does not resolve worker starvation issue. The application structure in last post is to use pool of threads managed by Service Broker to schedule tasks issued from differrent scheduler. Based on this structure, definitely you can dedicate time on coding at the activation procedure or initiator side to make the worker to go round robbin manner to excute all tasks and hence eliminate the starvation at some level. Thread starvation will still happen under the situation that large amount of long tasks are scheduled. We cannot specify thread quantum to overcome the issue. However, we can definitely utilize the nature of SQL Server to overcome the issue.

In general, the idea is that I assign one broker queue to one scheduler so that the semaphore is controlled by MAX QUEUE READER and there will be no interference between schedulers if one scheduler takes more threads. The quantum will be controlled by SQL Server Engine. Let’s only talk about a simplified scenaro in this blog post – one scheduler. We are going to use only a ver important feature in service broker — the receiving order of messages within a conversation is same as the sending order. Will this mean we can only have one thread at target (activation procedure) side? If you activation procedure is designed in the way that entire procedure is running in a transaction, the messages will be processed sequientially regardless the value on MAX QUEUE READER.

create procedure QueueActivation
as
begin
	begin transaction
	receive...
	...
	--Your Work
	...
	commit 
end

But if you process your message processor is designed in this way, the situation changes. Once the transaction is committed, next message will be received by other activation thread while the prior thread is doing “Your Work”.

create procedure QueueActivation
as
begin
	begin transaction
	receive...
	commit 
	...
	--Your Work
	...
end

We will send the tasks with the same order number to the service broker sequentially. The tasks needed to be run after the first set of tasks will be sent sequentially as well after the tasks in previous order sent. The tasks will be received at target in order. We can put binary semaphore in the initiator side to allow only one thread to send a task at a time. At the target we need to ensure all tasks with order number, for instance 100, can be running together, where tasks with order number 200 must running all 100s complete. While tasks with order 100 are running, tasks tagged with 400, 500, or even more are already int he queue. So those tasks must wait. We don’t want to have a status table and keep looping and searching it

The solution has been figured in my previously written post , Locking and Blocking (11) – Process Control and Customized Locking Hierarchies. We are going to use application locks. When a task is running, a lock on a resource is granted during the execution. Tasks with the same order are sharing the same lock resource (don’t conflict with each other). Tasks in the next order will share different lock resource but must be conflicting with the lock held in previous order. So, we need 2 type of locks conflicting with each other but does not conflicting itself. I use Intent Exclusive and Shared lock, because they don’t conflict with themself but do with each other.

Scheduled By Activator

The process design of activator is illustrated. When an activatin procedure is started, a transactioin is issued. It starts receiving message (task). Once a message is received, a Shared lock is placed on current order before the transaction is committed. This prevent other activators from receiving tasks when a lock has not been granted to the first ( and only one) receiver. Once a shared lock is granted, this task is secured. It then checks whether or not it should start this task by checking existance of previous task by granting Intent Exclusive lock on previous order. If there is any tasks before current order, Shared lock must be on previous order, then the task in current order must wait until all Shared locks in previous order released. Once all required locks are granted successfully, the task can be launched. After it finishes, locks are acquired in that duration are all released.

Another requirement of this design is when a message is received, we must know the order of previous task. This is simple. We can always embed that as part of the message. See the implementation below.

create table ExecutoinLog
(
	ExecutinLogID int identity(1,1) primary key,
	SQL nvarchar(max),
	StartTime datetime2(7) default(sysdatetime()),
	EndTime datetime2(7)
)
go
create procedure ExecSQL(@SQL nvarchar(max))
as
begin
	set nocount on
	declare @ExecutionLogID int
	insert into ExecutoinLog (SQL)
		values(@SQL)
	select @ExecutionLogID = SCOPE_IDENTITY();
	exec(@SQL)
	update ExecutoinLog 
		set EndTime = sysdatetime()
	where ExecutinLogID = @ExecutionLogID
end

go
create procedure ActivationSchedulerQueue
as
begin
	declare @Handle uniqueidentifier, @Type sysname, @msg xml
	begin transaction
	waitfor(
		Receive top (1)
			@Handle = conversation_handle,
			@Type = message_type_name,
			@msg = cast(message_body as xml)
		from SchedulerQueue
	), timeout 500
	if @Handle is null
	begin
		rollback
		return;
	end
	if @Type = 'DEFAULT' and @msg is not null
	begin
		declare @SQL nvarchar(max), @PreviousOrder int, @CurrentOrder int, 
				@ResourcePrevious nvarchar(255),
				@ResourceCurrent nvarchar(255)
		select	@SQL = @msg.value('(//@SQL)[1]', 'nvarchar(max)'),
				@PreviousOrder = @msg.value('(//@PreviousOrder)[1]', 'int'),
				@CurrentOrder = @msg.value('(//@CurrentOrder)[1]', 'int')
				
		select @ResourceCurrent = 'SchedulerQueue - ' + CAST(@CurrentOrder as varchar(20))
		exec sp_getapplock @Resource = @ResourceCurrent,  @LockMode = 'Shared', @LockOwner = 'Session', @LockTimeout = -1
		
		commit
		
		select @ResourcePrevious = 'SchedulerQueue - ' + CAST(@PreviousOrder as varchar(20))
		exec sp_getapplock @Resource = @ResourcePrevious,  @LockMode = 'IntentExclusive', @LockOwner = 'Session', @LockTimeout = -1
		
		begin try
		exec ExecSQL @SQL
		end try
		begin catch
			-- place your error handling code here
		end catch
		
		exec sp_releaseapplock @ResourcePrevious, 'Session'
		exec sp_releaseapplock @ResourceCurrent, 'Session'
	end
	else
	begin
		commit
		end conversation @Handle;
	end
end
go
create queue SchedulerQueue 
	with status = on, 
	activation(
				status = on, 
				procedure_name = ActivationSchedulerQueue, 
				max_queue_readers = 5, 
				execute as self
				)
go
create service ServiceScheduler on queue SchedulerQueue([DEFAULT])
go
create procedure ExecAsync (@msg xml)
as
begin
	if @msg is null
		return;
	declare @handle uniqueidentifier
	select @handle = conversation_handle 
	from sys.conversation_endpoints 
	where is_initiator = 1 
		and far_service = 'ServiceScheduler'
		and state <> 'CD'
	if @@ROWCOUNT = 0
	begin
		begin dialog conversation @handle
		from service ServiceScheduler
		to service  'ServiceScheduler'
		on contract [DEFAULT]
		with encryption = off;
	end;
	send on conversation @handle message type [DEFAULT](@msg)
	return
end
go

Test code is

--test
if object_id('tempdb..#SQLs') is not null
	drop table #SQLs
create table #SQLs (SQL nvarchar(max) not null, ExecutionOrder int not null)
create clustered index ExecutionOrder on #SQLs(ExecutionOrder)
go
declare @SQL nvarchar(max) = 'waitfor delay ''00:00:10.100'''
insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 100)
insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 100)
insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 100)
insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 100)
go
declare @SQL nvarchar(max) = 'waitfor delay ''00:00:09.200'''
insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 200)
insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 200)
go
declare @SQL nvarchar(max) = 'waitfor delay ''00:00:08.300'''
insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 300)
go
declare @SQL nvarchar(max) = 'waitfor delay ''00:00:07.400'''
insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 400)
insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 400)
go
declare @SQL nvarchar(max) = 'waitfor delay ''00:00:06.500'''
insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 500)
go
select * from #SQLs
go
set nocount on
declare @SQL nvarchar(max), @CurrentOrder int, @PreviousOrder int, @msg xml
declare c cursor for
	select  SQL [SQL], ExecutionOrder [CurrentOrder], 
			isnull(
				(select top 1 
					ExecutionOrder 
				from #SQLs s2
				where s1.ExecutionOrder > s2.ExecutionOrder
				order by ExecutionOrder desc
				), 0) [PreviousOrder]
	from #SQLs s1
	order by ExecutionOrder
open c
fetch next from c into @SQL, @CurrentOrder, @PreviousOrder
while @@fetch_status = 0
begin
	select @msg = (select @SQL [@SQL], @CurrentOrder [@CurrentOrder], @PreviousOrder [@PreviousOrder] for xml path('Command'));
	--select @msg
	exec ExecAsync @msg
	fetch next from c into @SQL, @CurrentOrder, @PreviousOrder
end
close c
deallocate c
go
select * from #SQLs
select * from ExecutoinLog 
select * from sys.conversation_endpoints

We don’t have GetScheduler procedure here. If you need to use this structure to serve multiple callers, each caller will need a service broker created with almost the same code. You can create number of them in advance. If application run out of scheduler, you can add schedulers in GetScheduler procedure. Once thing you need to be aware of is that, when you increase number of MAX QUEUE READER, number of threads will be increased. But when you decrease that number, total number of threads will NOT be reduced immediately, see blog post here

 Leave a Reply

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong> <pre class="">

(required)

(required)

C# | HTML | Plain Text | SQL | XHTML | XML | XSLT |

This site uses Akismet to reduce spam. Learn how your comment data is processed.