HiveBrain v1.2.0
Get Started
← Back to all entries
patternMinor

Command Dispatcher for messaging using MailboxProcessor

Submitted by: @import:stackexchange-codereview··
0
Viewed 0 times
mailboxprocessormessagingusingforcommanddispatcher

Problem

This code is intended to process commands in a CQRS+Messaging system. It will parallelize the message processing to the degree you specify by the msgToIdFn parameter. The function's intention is to extract the target ID from the command message. This will be used to correlate all messages with that ID to the same agent. Agents are automatically created for each ID as needed and destroyed when their inbox empties.

Right now runFn is expected to execute any message. Likely there will be some match ... with based routing in there to exercise the appropriate module with the appropriate parameters and services.

The comments are verbose because I am learning and also explaining how it works to my team. I can trim them down if too distracting.

I'm looking for possible improvements to code style, missed edge cases, and efficiency. For example, I am returning Some agents in order to continue the main dispatcher loop or None to stop it -- that doesn't seem quite right to me. See other Known Issues at the bottom.

```
module MessageDispatcher

// the functionality exposed by starting a dispatcher
type MessageDispatcher = {
Dispatch:'message -> Async;
Shutdown: unit -> unit
}

// the things that the agents can be instructed to do
type private AgentMessage =
| Run of 'message
| Stop

// the things that the dispatcher can be instructed to do
type private DispatcherMessage =
| Dispatch of 'message
| CompleteRun of 'key
| Shutdown

// it made sense to make MessageCount mutable
// otherwise, code would have to create a new record, remove old from agentsMap, add new to agentMap... every time
type private AgentCounter = {
mutable MessageCount: int;
Agent: MailboxProcessor * ('result -> unit)>
}

/// Start the dispatcher
/// runFn is how you plug in execution code
/// signature: 'message -> 'result
/// msgToIdFn extracts the id from the message, so it can be linked to a specific agent
/// signature: 'message -> 'key
/// sysFailF

Solution

// the functionality exposed by starting a dispatcher
type MessageDispatcher = {
    Dispatch:'message -> Async;
    Shutdown: unit -> unit
}

…

// this sends a dispatch message to the dispatcher agent
let dispatchFn (msg:'message) =
    let message = Dispatch msg // create a dispatch message from the given message
    // send the message and reply channel
    dispatchAgent.PostAndAsyncReply(fun replyChannel -> (message, replyChannel.Reply))
    // the reply channel allows callers to get a response back

// this sends the shutdown message to the dispatcher agent
let shutdownFn () = dispatchAgent.Post(Shutdown, ignore)

// this exposes the public functions that this dispatcher supports
let dispatcher = {Dispatch = dispatchFn; Shutdown = shutdownFn}

// return the dispatcher
dispatcher


This looks like a functional way of emulating OOP. But F# does support OOP directly, so you should probably use that:

type IMessageDispatcher =
    abstract Dispatch:'message -> Async;
    abstract Shutdown: unit -> unit

…

// return the dispatcher
{ new IMessageDispatcher with
    member this.Dispatch(msg) =
        let message = Dispatch msg // create a dispatch message from the given message
        // send the message and reply channel
        dispatchAgent.PostAndAsyncReply(fun replyChannel -> (message, replyChannel.Reply))
        // the reply channel allows callers to get a response back

    member this.Shutdown() =
        dispatchAgent.Post(Shutdown, ignore)
}


/// Start the dispatcher
/// runFn is how you plug in execution code
///     signature: 'message -> 'result
/// msgToIdFn extracts the id from the message, so it can be linked to a specific agent
///     signature: 'message -> 'key
/// sysFailFn wraps an unhandled exception in the caller's desired result type
///     signature: exn -> 'result


Why are you duplicating the signature here? I don't see any reason for that.

Also, consider using XML documentation comments for this kind of documentation. (Though I'm not sure the tooling is good enough to make that worth it.)

msgToIdFn:'message -> 'key


Try not to use abbreviations in names, it makes them harder to read.

// function to create a new agent
let createAgent runCompleted = 
    // create a start a new mailbox processor
    MailboxProcessor.Start


You're right, your comments can be too verbose. Use comments when something is unclear or needs explanation.

runCompleted() // notify completion of this message
return! loop () // continue to run


Why are you calling unit-taking functions sometimes without a space (so it looks like C-like function call) and sometimes without (so it looks like functional function call with unit as an argument)? I think you should choose one or the other and stick with it.

// the things that the agents can be instructed to do
type private AgentMessage =
    | Run of 'message
    | Stop


Since Stop doesn't need a reply, consider incorporating the reply function directly into AgentMessage:

type private AgentMessage =
    | Run of 'message * ('result -> unit)
    | Stop


The same approach could be applied to DispatcherMessage too.

// run the msgToKey function to get the id out of the message
let id = msgToIdFn msg


This shows one reason why too many comments can hurt: when you change code, you need to make sure you also modify the corresponding comment. This specific comment is wrong and useless (since it just repeats what the code says).

match agentCounter.MessageCount with
| 0 -> decommission id agentCounter.Agent agentMap // immediately decommission
| _ -> agentMap // return the same agent map -- nothing changed


You don't need to use match when if would work too:

if agentCounter.MessageCount = 0
then decommission id agentCounter.Agent agentMap // immediately decommission
else agentMap // return the same agent map -- nothing changed

Code Snippets

// the functionality exposed by starting a dispatcher
type MessageDispatcher<'message, 'result> = {
    Dispatch:'message -> Async<'result>;
    Shutdown: unit -> unit
}

…

// this sends a dispatch message to the dispatcher agent
let dispatchFn (msg:'message) =
    let message = Dispatch msg // create a dispatch message from the given message
    // send the message and reply channel
    dispatchAgent.PostAndAsyncReply(fun replyChannel -> (message, replyChannel.Reply))
    // the reply channel allows callers to get a response back

// this sends the shutdown message to the dispatcher agent
let shutdownFn () = dispatchAgent.Post(Shutdown, ignore)

// this exposes the public functions that this dispatcher supports
let dispatcher = {Dispatch = dispatchFn; Shutdown = shutdownFn}

// return the dispatcher
dispatcher
type IMessageDispatcher<'message, 'result> =
    abstract Dispatch:'message -> Async<'result>;
    abstract Shutdown: unit -> unit

…

// return the dispatcher
{ new IMessageDispatcher<_, _> with
    member this.Dispatch(msg) =
        let message = Dispatch msg // create a dispatch message from the given message
        // send the message and reply channel
        dispatchAgent.PostAndAsyncReply(fun replyChannel -> (message, replyChannel.Reply))
        // the reply channel allows callers to get a response back

    member this.Shutdown() =
        dispatchAgent.Post(Shutdown, ignore)
}
/// Start the dispatcher
/// runFn is how you plug in execution code
///     signature: 'message -> 'result
/// msgToIdFn extracts the id from the message, so it can be linked to a specific agent
///     signature: 'message -> 'key
/// sysFailFn wraps an unhandled exception in the caller's desired result type
///     signature: exn -> 'result
msgToIdFn:'message -> 'key
// function to create a new agent
let createAgent runCompleted = 
    // create a start a new mailbox processor
    MailboxProcessor.Start

Context

StackExchange Code Review Q#83696, answer score: 2

Revisions (0)

No revisions yet.