patternMinor
Event sourcing using functional programming
Viewed 0 times
sourcingprogrammingusingfunctionalevent
Problem
Is this a satisfactory implementation of event sourcing using functional programming?
module Account
(*Types*)
type EventAggregate = { Event:Event; Aggregate:'Aggregate }
and Command =
| Login of EventAggregate
| Deposit of EventAggregate
| Withdrawal of EventAggregate
and Event =
| LoggedIn of Login
| Deposited of decimal
| Withdrew of decimal
and AccountId = AccountId of string
and FirstName = FirstName of string
and LastName = LastName of string
and Login = { UserId:string; Password:string }
and Account = {
AccountId: AccountId
FirstName: FirstName
LastName: LastName }
(*Functions*)
let appendTo (repo:Map) (plan:EventAggregate) =
repo.Add (plan.Event , plan.Aggregate)
let getAccount login = {
AccountId= AccountId "myAccountId"
FirstName= FirstName "Scott"
LastName= LastName "Nimrod" }
let interpret cmd repo = cmd |> function
| Login v
| Deposit v -> appendTo repo v
| _ -> repo
(*Client*)
let login = { UserId= "MyUserId"; Password= "MyPassword" }
let account = getAccount login
let loginPlan = { Event= LoggedIn login
Aggregate= account }
let depositPlan = { Event= Deposited 100m
Aggregate= account }
let store = (Login loginPlan , Map.empty)
||> interpret
|> interpret (Deposit depositPlan)Solution
The key concept in event sourcing is having an 'updater' function that takes an event and an old state, and returns a new state. Let's capture that as a type. The type can be very generic:
The next thing to capture is your specific domain and its specific implementation of the 'updater' function:
Finally, you need some stream of events. It can be in the form of some stream data type or an observable sequence. We will use an
module Update =
type t = 's -> 'e -> 's
(**
A function that represents the operation of taking an event and an old
state to return a new state.
*)The next thing to capture is your specific domain and its specific implementation of the 'updater' function:
module Account =
type t = { logged_in : bool; balance : decimal }
type event =
| Login of userid:string * passwd:string
| Deposit of decimal
| Withdraw of decimal
let empty = { logged_in = false; balance = 0m }
let login_valid userid passwd = true //
{ t with logged_in = login_valid userid passwd }
| Deposit amt, { logged_in = true; balance = balance } ->
{ t with balance = balance + amt }
| Withdraw amt, { logged_in = true; balance = balance } ->
{ t with balance = balance - amt }
| _, t -> t
(* val update : Update.tFinally, you need some stream of events. It can be in the form of some stream data type or an observable sequence. We will use an
IObservable because it's idiomatic in the .Net world. With this stream of events, you can 'fold' over the events and regenerate your final state (your account) by applying the update function at each step.module Main =
open System.Reactive.Linq
(* For example only; real stream will be built differently. *)
let acct_events =
Observable.Concat(
[| Observable.Return(Account.Login ("bob", "pass"))
Observable.Return(Account.Deposit 1000m)
Observable.Return(Account.Withdraw 500m) |])
let acct =
Observable.Aggregate(acct_events, Account.empty, Account.update)
(** val acct : IObservable *)Code Snippets
module Update =
type t<'s, 'e> = 's -> 'e -> 's
(**
A function that represents the operation of taking an event and an old
state to return a new state.
*)module Account =
type t = { logged_in : bool; balance : decimal }
type event =
| Login of userid:string * passwd:string
| Deposit of decimal
| Withdraw of decimal
let empty = { logged_in = false; balance = 0m }
let login_valid userid passwd = true // <- TBD
let update t event =
match event, t with
| Login (userid, passwd), _ ->
{ t with logged_in = login_valid userid passwd }
| Deposit amt, { logged_in = true; balance = balance } ->
{ t with balance = balance + amt }
| Withdraw amt, { logged_in = true; balance = balance } ->
{ t with balance = balance - amt }
| _, t -> t
(* val update : Update.t<t, event>module Main =
open System.Reactive.Linq
(* For example only; real stream will be built differently. *)
let acct_events =
Observable.Concat(
[| Observable.Return(Account.Login ("bob", "pass"))
Observable.Return(Account.Deposit 1000m)
Observable.Return(Account.Withdraw 500m) |])
let acct =
Observable.Aggregate(acct_events, Account.empty, Account.update)
(** val acct : IObservable<Account.t> *)Context
StackExchange Code Review Q#155065, answer score: 5
Revisions (0)
No revisions yet.