patternsqlMinor
Concurrent reads and updates
Viewed 0 times
andreadsconcurrentupdates
Problem
Let's say I have a table with 50,000 transactions that need to be processed in the background in the same order they were received. I have a Windows service that polls for transactions, picks them up, updates the transaction table that that record is picked by this instance at a particular time. All this works perfectly well.
Now, suppose I want to scale out the processing and run another instance of the same service on another machine, how do I ensure that both instances do not pick up the same records?
I either have both instances picking the same record and one instance overwriting update of the other or if I have additional filters in the update query to prevent that I have one instance picking up some records and updating it and the other instance running but not able to do anything simply because the record is already updated.
Is there a way to solve this?
Now, suppose I want to scale out the processing and run another instance of the same service on another machine, how do I ensure that both instances do not pick up the same records?
I either have both instances picking the same record and one instance overwriting update of the other or if I have additional filters in the update query to prevent that I have one instance picking up some records and updating it and the other instance running but not able to do anything simply because the record is already updated.
Is there a way to solve this?
Solution
My favourite way of achieving this is the
Now the processing thread:
If you're marking the row as processed by deleting it, it works in the same way: the only thing you have to do is change the
OUTPUT clause. Here is an example:SET NOCOUNT ON;
-- example setup for the queue table
DECLARE @queue TABLE (
transaction_id int PRIMARY KEY,
processed bit
);
-- some sample data
INSERT INTO @queue
VALUES
(1,0),
(2,0),
(3,0),
(4,0),
(5,0);Now the processing thread:
-- processing thread
WHILE 1 = 1
BEGIN
DECLARE @transaction_id int;
DECLARE @row_to_process TABLE (
transaction_id int
);
BEGIN TRAN;
BEGIN TRY
DELETE FROM @row_to_process;
UPDATE Q
SET processed = 1
OUTPUT inserted.transaction_id
INTO @row_to_process
FROM (
SELECT TOP(1) transaction_id, processed
FROM @queue
WHERE processed = 0
ORDER BY transaction_id
) AS Q;
IF @@ROWCOUNT = 0
BEGIN
COMMIT;
-- sleep for 5 seconds then restart the loop
WAITFOR DELAY '00:00:05';
CONTINUE;
END
SELECT @transaction_id = transaction_id
FROM @row_to_process;
IF @transaction_id IS NOT NULL
BEGIN
RAISERROR(N'processing transaction N. %d.',1,1, @transaction_id) WITH NOWAIT;
EXEC whatever_processes_the_row @transaction_id;
END
COMMIT;
END TRY
BEGIN CATCH
ROLLBACK;
THROW;
END CATCH
ENDIf you're marking the row as processed by deleting it, it works in the same way: the only thing you have to do is change the
UPDATE statement into a DELETE statement and pull the current @transaction_id from the DELETED logical table.Code Snippets
SET NOCOUNT ON;
-- example setup for the queue table
DECLARE @queue TABLE (
transaction_id int PRIMARY KEY,
processed bit
);
-- some sample data
INSERT INTO @queue
VALUES
(1,0),
(2,0),
(3,0),
(4,0),
(5,0);-- processing thread
WHILE 1 = 1
BEGIN
DECLARE @transaction_id int;
DECLARE @row_to_process TABLE (
transaction_id int
);
BEGIN TRAN;
BEGIN TRY
DELETE FROM @row_to_process;
UPDATE Q
SET processed = 1
OUTPUT inserted.transaction_id
INTO @row_to_process
FROM (
SELECT TOP(1) transaction_id, processed
FROM @queue
WHERE processed = 0
ORDER BY transaction_id
) AS Q;
IF @@ROWCOUNT = 0
BEGIN
COMMIT;
-- sleep for 5 seconds then restart the loop
WAITFOR DELAY '00:00:05';
CONTINUE;
END
SELECT @transaction_id = transaction_id
FROM @row_to_process;
IF @transaction_id IS NOT NULL
BEGIN
RAISERROR(N'processing transaction N. %d.',1,1, @transaction_id) WITH NOWAIT;
EXEC whatever_processes_the_row @transaction_id;
END
COMMIT;
END TRY
BEGIN CATCH
ROLLBACK;
THROW;
END CATCH
ENDContext
StackExchange Database Administrators Q#142457, answer score: 2
Revisions (0)
No revisions yet.