HiveBrain v1.2.0
Get Started
← Back to all entries
patternsqlMinor

Concurrent reads and updates

Submitted by: @import:stackexchange-dba··
0
Viewed 0 times
andreadsconcurrentupdates

Problem

Let's say I have a table with 50,000 transactions that need to be processed in the background in the same order they were received. I have a Windows service that polls for transactions, picks them up, updates the transaction table that that record is picked by this instance at a particular time. All this works perfectly well.

Now, suppose I want to scale out the processing and run another instance of the same service on another machine, how do I ensure that both instances do not pick up the same records?

I either have both instances picking the same record and one instance overwriting update of the other or if I have additional filters in the update query to prevent that I have one instance picking up some records and updating it and the other instance running but not able to do anything simply because the record is already updated.

Is there a way to solve this?

Solution

My favourite way of achieving this is the OUTPUT clause. Here is an example:

SET NOCOUNT ON;

-- example setup for the queue table
DECLARE @queue TABLE (
    transaction_id int PRIMARY KEY,
    processed bit
);

-- some sample data
INSERT INTO @queue 
VALUES
    (1,0),
    (2,0),
    (3,0),
    (4,0),
    (5,0);


Now the processing thread:

-- processing thread
WHILE 1 = 1
BEGIN 
    DECLARE @transaction_id int;

    DECLARE @row_to_process TABLE (
        transaction_id int
    );

    BEGIN TRAN;

    BEGIN TRY

        DELETE FROM @row_to_process;

        UPDATE Q
        SET processed = 1
        OUTPUT inserted.transaction_id 
            INTO @row_to_process
        FROM (
            SELECT TOP(1) transaction_id, processed
            FROM @queue
            WHERE processed = 0
            ORDER BY transaction_id
        ) AS Q;

        IF @@ROWCOUNT = 0
        BEGIN
            COMMIT;
            -- sleep for 5 seconds then restart the loop
            WAITFOR DELAY '00:00:05'; 
            CONTINUE;
        END

        SELECT @transaction_id = transaction_id
        FROM @row_to_process;

        IF @transaction_id IS NOT NULL
        BEGIN
            RAISERROR(N'processing transaction N. %d.',1,1, @transaction_id) WITH NOWAIT;
            EXEC whatever_processes_the_row @transaction_id;
        END

        COMMIT;

    END TRY
    BEGIN CATCH
        ROLLBACK;
        THROW;
    END CATCH
END


If you're marking the row as processed by deleting it, it works in the same way: the only thing you have to do is change the UPDATE statement into a DELETE statement and pull the current @transaction_id from the DELETED logical table.

Code Snippets

SET NOCOUNT ON;

-- example setup for the queue table
DECLARE @queue TABLE (
    transaction_id int PRIMARY KEY,
    processed bit
);


-- some sample data
INSERT INTO @queue 
VALUES
    (1,0),
    (2,0),
    (3,0),
    (4,0),
    (5,0);
-- processing thread
WHILE 1 = 1
BEGIN 
    DECLARE @transaction_id int;

    DECLARE @row_to_process TABLE (
        transaction_id int
    );

    BEGIN TRAN;

    BEGIN TRY

        DELETE FROM @row_to_process;

        UPDATE Q
        SET processed = 1
        OUTPUT inserted.transaction_id 
            INTO @row_to_process
        FROM (
            SELECT TOP(1) transaction_id, processed
            FROM @queue
            WHERE processed = 0
            ORDER BY transaction_id
        ) AS Q;

        IF @@ROWCOUNT = 0
        BEGIN
            COMMIT;
            -- sleep for 5 seconds then restart the loop
            WAITFOR DELAY '00:00:05'; 
            CONTINUE;
        END

        SELECT @transaction_id = transaction_id
        FROM @row_to_process;

        IF @transaction_id IS NOT NULL
        BEGIN
            RAISERROR(N'processing transaction N. %d.',1,1, @transaction_id) WITH NOWAIT;
            EXEC whatever_processes_the_row @transaction_id;
        END

        COMMIT;

    END TRY
    BEGIN CATCH
        ROLLBACK;
        THROW;
    END CATCH
END

Context

StackExchange Database Administrators Q#142457, answer score: 2

Revisions (0)

No revisions yet.