patterncsharpMinor
Review an asynchronous/message-oriented library actor
Viewed 0 times
actorasynchronousmessagelibraryorientedreview
Problem
I'd like to get input on a F# actor that coordinates receives around a blocking message buffer. The actor is a piece of code that continuously tries to fetch messages from Azure Service Bus.
```
(*
Copyright 2012 Henrik Feldt
Licensed under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of the
License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
*)
namespace MassTransit.Async
open Microsoft.ServiceBus
open Microsoft.ServiceBus.Messaging
open FSharp.Control
open System
open System.Threading
open System.Runtime.CompilerServices
open System.Collections.Concurrent
open MassTransit.AzureServiceBus
open MassTransit.Async.Queue
open MassTransit.Async.AsyncRetry
type internal Agent = AutoCancelAgent
/// communication with the worker agent
type RecvMsg =
Start
| Pause
| Halt of AsyncReplyChannel
| SubscribeQueue of QueueDescription * Concurrency
| UnsubscribeQueue of QueueDescription
| SubscribeTopic of TopicDescription * Concurrency
| UnsubscribeTopic of TopicDescription
/// concurrently outstanding asynchronous requests (workers)
and Concurrency = uint32
/// State-keeping structure, mapping a description to a pair of cancellation token source and
/// receiver set list. The CancellationTokenSource can be used to stop the subscription that
/// it corresponds to.
type WorkerState =
{ QSubs : Map;
TSubs : Map Async) CancellationTokenSource ReceiverSet list> }
/// A pair of a messaging factory and a list of message receivers that
/// were created from that messaging factory.
and ReceiverSet = Pair of MessagingFactory * MessageReceiver list
type ReceiverDe
```
(*
Copyright 2012 Henrik Feldt
Licensed under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of the
License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
*)
namespace MassTransit.Async
open Microsoft.ServiceBus
open Microsoft.ServiceBus.Messaging
open FSharp.Control
open System
open System.Threading
open System.Runtime.CompilerServices
open System.Collections.Concurrent
open MassTransit.AzureServiceBus
open MassTransit.Async.Queue
open MassTransit.Async.AsyncRetry
type internal Agent = AutoCancelAgent
/// communication with the worker agent
type RecvMsg =
Start
| Pause
| Halt of AsyncReplyChannel
| SubscribeQueue of QueueDescription * Concurrency
| UnsubscribeQueue of QueueDescription
| SubscribeTopic of TopicDescription * Concurrency
| UnsubscribeTopic of TopicDescription
/// concurrently outstanding asynchronous requests (workers)
and Concurrency = uint32
/// State-keeping structure, mapping a description to a pair of cancellation token source and
/// receiver set list. The CancellationTokenSource can be used to stop the subscription that
/// it corresponds to.
type WorkerState =
{ QSubs : Map;
TSubs : Map Async) CancellationTokenSource ReceiverSet list> }
/// A pair of a messaging factory and a list of message receivers that
/// were created from that messaging factory.
and ReceiverSet = Pair of MessagingFactory * MessageReceiver list
type ReceiverDe
Solution
Disclaimer: I know some OCaml and a bit of Erlang, but never implemented an F# actor before. I also read the async paper you mentioned in the comments (apart from "Semantics" and "Implementation").
Style
Indentation
You should indent Start like this:
The syntax allows you to omit the initial pipe, but it's because of one-liners like
Declaring types
You should use
Returning unit
Pattern matching
Why don't you
Comments Your comments are good! I especially liked the comment of the main agent, which helps to understand the state machine.
Code
Tail Recursion Section 2.2 of the async paper mentions a way to define
States
I don't like all those "in-between" states, they make your actor look super complicated and don't seem natural. I have little experience with actors, so I can't tell if this is recommended or not.
Style
Indentation
type RecvMsg =
Start
| PauseYou should indent Start like this:
type RecvMsg =
| Start
| PauseThe syntax allows you to omit the initial pipe, but it's because of one-liners like
type RecvMsg = Start | Pause | ...Declaring types
/// concurrently outstanding asynchronous requests (workers)
and Concurrency = uint32You should use
and only for mutually recursive types. Concurrency could as well be defined just above RecvMsg.Returning unit
else () is implicit and not needed.Pattern matching
let (Pair(mf, rs)) = pairWhy don't you
match pair with Pair(mf, rs) -> ...?Comments Your comments are good! I especially liked the comment of the main agent, which helps to understand the state machine.
Code
Tail Recursion Section 2.2 of the async paper mentions a way to define
count that is elegant, concise and efficient. You should consider replacing the ugly while True loops with nice tail recursive functions. :)States
I don't like all those "in-between" states, they make your actor look super complicated and don't seem natural. I have little experience with actors, so I can't tell if this is recommended or not.
Code Snippets
type RecvMsg =
Start
| Pausetype RecvMsg =
| Start
| Pause/// concurrently outstanding asynchronous requests (workers)
and Concurrency = uint32let (Pair(mf, rs)) = pairContext
StackExchange Code Review Q#9605, answer score: 2
Revisions (0)
No revisions yet.