There are always concerns while writing a multi-threading program. In the last post, the considerations are degree of parallelism and tasks starving from getting workers. Max threads for one scheduler can be restricted by applying counting semaphore as I posted there. But it still does not resolve worker starvation issue. The application structure in last post is to use pool of threads managed by Service Broker to schedule tasks issued from differrent scheduler. Based on this structure, definitely you can dedicate time on coding at the activation procedure or initiator side to make the worker to go round robbin manner to excute all tasks and hence eliminate the starvation at some level. Thread starvation will still happen under the situation that large amount of long tasks are scheduled. We cannot specify thread quantum to overcome the issue. However, we can definitely utilize the nature of SQL Server to overcome the issue.
In general, the idea is that I assign one broker queue to one scheduler so that the semaphore is controlled by MAX QUEUE READER and there will be no interference between schedulers if one scheduler takes more threads. The quantum will be controlled by SQL Server Engine. Let’s only talk about a simplified scenaro in this blog post – one scheduler. We are going to use only a ver important feature in service broker — the receiving order of messages within a conversation is same as the sending order. Will this mean we can only have one thread at target (activation procedure) side? If you activation procedure is designed in the way that entire procedure is running in a transaction, the messages will be processed sequientially regardless the value on MAX QUEUE READER.
create procedure QueueActivation as begin begin transaction receive... ... --Your Work ... commit end
But if you process your message processor is designed in this way, the situation changes. Once the transaction is committed, next message will be received by other activation thread while the prior thread is doing “Your Work”.
create procedure QueueActivation as begin begin transaction receive... commit ... --Your Work ... end
We will send the tasks with the same order number to the service broker sequentially. The tasks needed to be run after the first set of tasks will be sent sequentially as well after the tasks in previous order sent. The tasks will be received at target in order. We can put binary semaphore in the initiator side to allow only one thread to send a task at a time. At the target we need to ensure all tasks with order number, for instance 100, can be running together, where tasks with order number 200 must running all 100s complete. While tasks with order 100 are running, tasks tagged with 400, 500, or even more are already int he queue. So those tasks must wait. We don’t want to have a status table and keep looping and searching it
The solution has been figured in my previously written post , Locking and Blocking (11) – Process Control and Customized Locking Hierarchies. We are going to use application locks. When a task is running, a lock on a resource is granted during the execution. Tasks with the same order are sharing the same lock resource (don’t conflict with each other). Tasks in the next order will share different lock resource but must be conflicting with the lock held in previous order. So, we need 2 type of locks conflicting with each other but does not conflicting itself. I use Intent Exclusive and Shared lock, because they don’t conflict with themself but do with each other.
The process design of activator is illustrated. When an activatin procedure is started, a transactioin is issued. It starts receiving message (task). Once a message is received, a Shared lock is placed on current order before the transaction is committed. This prevent other activators from receiving tasks when a lock has not been granted to the first ( and only one) receiver. Once a shared lock is granted, this task is secured. It then checks whether or not it should start this task by checking existance of previous task by granting Intent Exclusive lock on previous order. If there is any tasks before current order, Shared lock must be on previous order, then the task in current order must wait until all Shared locks in previous order released. Once all required locks are granted successfully, the task can be launched. After it finishes, locks are acquired in that duration are all released.
Another requirement of this design is when a message is received, we must know the order of previous task. This is simple. We can always embed that as part of the message. See the implementation below.
create table ExecutoinLog ( ExecutinLogID int identity(1,1) primary key, SQL nvarchar(max), StartTime datetime2(7) default(sysdatetime()), EndTime datetime2(7) ) go create procedure ExecSQL(@SQL nvarchar(max)) as begin set nocount on declare @ExecutionLogID int insert into ExecutoinLog (SQL) values(@SQL) select @ExecutionLogID = SCOPE_IDENTITY(); exec(@SQL) update ExecutoinLog set EndTime = sysdatetime() where ExecutinLogID = @ExecutionLogID end go create procedure ActivationSchedulerQueue as begin declare @Handle uniqueidentifier, @Type sysname, @msg xml begin transaction waitfor( Receive top (1) @Handle = conversation_handle, @Type = message_type_name, @msg = cast(message_body as xml) from SchedulerQueue ), timeout 500 if @Handle is null begin rollback return; end if @Type = 'DEFAULT' and @msg is not null begin declare @SQL nvarchar(max), @PreviousOrder int, @CurrentOrder int, @ResourcePrevious nvarchar(255), @ResourceCurrent nvarchar(255) select @SQL = @msg.value('(//@SQL)[1]', 'nvarchar(max)'), @PreviousOrder = @msg.value('(//@PreviousOrder)[1]', 'int'), @CurrentOrder = @msg.value('(//@CurrentOrder)[1]', 'int') select @ResourceCurrent = 'SchedulerQueue - ' + CAST(@CurrentOrder as varchar(20)) exec sp_getapplock @Resource = @ResourceCurrent, @LockMode = 'Shared', @LockOwner = 'Session', @LockTimeout = -1 commit select @ResourcePrevious = 'SchedulerQueue - ' + CAST(@PreviousOrder as varchar(20)) exec sp_getapplock @Resource = @ResourcePrevious, @LockMode = 'IntentExclusive', @LockOwner = 'Session', @LockTimeout = -1 begin try exec ExecSQL @SQL end try begin catch -- place your error handling code here end catch exec sp_releaseapplock @ResourcePrevious, 'Session' exec sp_releaseapplock @ResourceCurrent, 'Session' end else begin commit end conversation @Handle; end end go create queue SchedulerQueue with status = on, activation( status = on, procedure_name = ActivationSchedulerQueue, max_queue_readers = 5, execute as self ) go create service ServiceScheduler on queue SchedulerQueue([DEFAULT]) go create procedure ExecAsync (@msg xml) as begin if @msg is null return; declare @handle uniqueidentifier select @handle = conversation_handle from sys.conversation_endpoints where is_initiator = 1 and far_service = 'ServiceScheduler' and state <> 'CD' if @@ROWCOUNT = 0 begin begin dialog conversation @handle from service ServiceScheduler to service 'ServiceScheduler' on contract [DEFAULT] with encryption = off; end; send on conversation @handle message type [DEFAULT](@msg) return end go
Test code is
--test if object_id('tempdb..#SQLs') is not null drop table #SQLs create table #SQLs (SQL nvarchar(max) not null, ExecutionOrder int not null) create clustered index ExecutionOrder on #SQLs(ExecutionOrder) go declare @SQL nvarchar(max) = 'waitfor delay ''00:00:10.100''' insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 100) insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 100) insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 100) insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 100) go declare @SQL nvarchar(max) = 'waitfor delay ''00:00:09.200''' insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 200) insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 200) go declare @SQL nvarchar(max) = 'waitfor delay ''00:00:08.300''' insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 300) go declare @SQL nvarchar(max) = 'waitfor delay ''00:00:07.400''' insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 400) insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 400) go declare @SQL nvarchar(max) = 'waitfor delay ''00:00:06.500''' insert into #SQLs(SQL, ExecutionOrder) values(@SQL, 500) go select * from #SQLs go set nocount on declare @SQL nvarchar(max), @CurrentOrder int, @PreviousOrder int, @msg xml declare c cursor for select SQL [SQL], ExecutionOrder [CurrentOrder], isnull( (select top 1 ExecutionOrder from #SQLs s2 where s1.ExecutionOrder > s2.ExecutionOrder order by ExecutionOrder desc ), 0) [PreviousOrder] from #SQLs s1 order by ExecutionOrder open c fetch next from c into @SQL, @CurrentOrder, @PreviousOrder while @@fetch_status = 0 begin select @msg = (select @SQL [@SQL], @CurrentOrder [@CurrentOrder], @PreviousOrder [@PreviousOrder] for xml path('Command')); --select @msg exec ExecAsync @msg fetch next from c into @SQL, @CurrentOrder, @PreviousOrder end close c deallocate c go select * from #SQLs select * from ExecutoinLog select * from sys.conversation_endpoints
We don’t have GetScheduler procedure here. If you need to use this structure to serve multiple callers, each caller will need a service broker created with almost the same code. You can create number of them in advance. If application run out of scheduler, you can add schedulers in GetScheduler procedure. Once thing you need to be aware of is that, when you increase number of MAX QUEUE READER, number of threads will be increased. But when you decrease that number, total number of threads will NOT be reduced immediately, see blog post here