HiveBrain v1.2.0
Get Started
← Back to all entries
patterncsharpMinor

Redis Message Queue (Cloud)

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
redismessagequeuecloud

Problem

I am in the process of writing a Cloud application (mostly hobby, learning) that required a very quick cache, queue, and messaging system. I have looked at a few different options from Microsoft (hosting is on Azure) and all seem to be slow (for my relative needs). Then I hit Redis, and the speed was right where I needed it to be for what I am using.

Other thoughts that I had before using this, is that I also want to keep my usage of components down to a minimum, in case I need to move from Azure to baremetal, etc, I can always host my own Redis.

I decided, for both learning and for sport, to write a queue system that could work as a AtMostOnce or AtLeastOnce that would be reliable over system failure. This class should also be able to be run on multiple machines (in this case workerroles) and be instanced by either IoC or manually.

The following is what I have so far, before I take care of some of the problems I have not implemented yet (cancellationTokens, shared ConnectionMultiplexer for instance). The following code does work as I have tested it on 3 different WorkerRoles instances, while also testing crashes and reboots. My concerns are aimed more at problems that I don't see, performance issues, and my lack of general experience. Feel free to tell me if I'm doing anything wrong, but be aware that I am aware there are packages out there already. I just like to do things myself.

```
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using StackExchange.Redis;

namespace CitySurvival.WorkerCommon
{
///
/// Needed: (2) Redis queues 1 for new messages, 1 for currently processing messages
/// Needed: processing messages list is FILO
///
/// The queues will only contain the key to the message in redis, which is stored as
/// a single entity for quick lookup
///
/// jobQueue -- processingQueue
/// job:1 job:2
///
/// job:1 (jo

Solution

-
dead code like in the constructor arguments for RedisJobQueue should be deleted. To keep track of changes a version control system like GIT or SVN should be used.

-
You are using C# 6.0 so you can make use of the string interpolation using the $ operator like so

public RedisJobQueue(string jobName)
{
    _jobQueue =  $"{jobName}:jobs";
    _processingQueue = $"{jobName}:process";
    _subChannel = $"{jobName}:channel";
    _jobName = jobName;
}


-
if possible reduce horizontal spacing by using guard conditions. For the Finish() method this can be done easily by inverting the condition of the if and returning early like so

public async Task Finish(string key, bool failed = false)
{
    var db = Database;
    await db.ListRemoveAsync(_processingQueue, key);

    if (!failed)
    {
        await db.KeyDeleteAsync(key);
        return;
    }

    // How many times to fail before dead
    if (await db.HashExistsAsync(key, "failedcount"))
    {
        var count = await db.HashGetAsync(key, "failedcount");
        if (count.IsInteger)
        {
            if ((int) count >= 10)
            {
                // for now, delete the key, later we might integrate a dead message
                // queue
                await db.KeyDeleteAsync(key);
                return;
            }
        }
    }

    db.HashIncrement(key, "failedcount");
    db.HashDelete(key, "active");
    db.ListRightPush(_jobQueue, key);

    ConnectionMultiplexer.GetSubscriber().Publish(_subChannel, "");
}


you could consider to change the parameter failed to success with a default value of true to prevent the negative check !failed, but this will involve some major changes to the caller of this method.

-
you can remove the duplicated code where you have both , synchronously and asynchronously methods, like so

private Dictionary GetJob()
{
    return Task.Run(GetJobAsync).Result;
}


See also: calling-async-method-synchronously

Code Snippets

public RedisJobQueue(string jobName)
{
    _jobQueue =  $"{jobName}:jobs";
    _processingQueue = $"{jobName}:process";
    _subChannel = $"{jobName}:channel";
    _jobName = jobName;
}
public async Task Finish(string key, bool failed = false)
{
    var db = Database;
    await db.ListRemoveAsync(_processingQueue, key);

    if (!failed)
    {
        await db.KeyDeleteAsync(key);
        return;
    }

    // How many times to fail before dead
    if (await db.HashExistsAsync(key, "failedcount"))
    {
        var count = await db.HashGetAsync(key, "failedcount");
        if (count.IsInteger)
        {
            if ((int) count >= 10)
            {
                // for now, delete the key, later we might integrate a dead message
                // queue
                await db.KeyDeleteAsync(key);
                return;
            }
        }
    }

    db.HashIncrement(key, "failedcount");
    db.HashDelete(key, "active");
    db.ListRightPush(_jobQueue, key);

    ConnectionMultiplexer.GetSubscriber().Publish(_subChannel, "");
}
private Dictionary<RedisValue, RedisValue> GetJob()
{
    return Task.Run(GetJobAsync).Result;
}

Context

StackExchange Code Review Q#105560, answer score: 3

Revisions (0)

No revisions yet.