From d42a1b852ad27ed86d86942917f7187d6480041e Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sat, 7 Dec 2019 10:17:40 +0000 Subject: [PATCH] Aggregate layout/naming consistency --- equinox-fc/Domain.Tests/AllocatorTests.fs | 30 ++++---- equinox-fc/Domain.Tests/TicketListTests.fs | 20 ++--- equinox-fc/Domain.Tests/TicketTests.fs | 32 ++++---- equinox-fc/Domain/Allocation.fs | 87 +++++++++++----------- equinox-fc/Domain/Allocator.fs | 37 ++++----- equinox-fc/Domain/Ticket.fs | 51 ++++++------- equinox-fc/Domain/TicketList.fs | 39 +++++----- 7 files changed, 150 insertions(+), 146 deletions(-) diff --git a/equinox-fc/Domain.Tests/AllocatorTests.fs b/equinox-fc/Domain.Tests/AllocatorTests.fs index fddfe5183..bca6d2fb3 100644 --- a/equinox-fc/Domain.Tests/AllocatorTests.fs +++ b/equinox-fc/Domain.Tests/AllocatorTests.fs @@ -15,45 +15,45 @@ type Result = let execute cmd state = match cmd with - | Commence (a,c) -> + | Commence (a, c) -> match decideCommence a c state with - | CommenceResult.Accepted, es -> Accepted,es - | CommenceResult.Conflict a, es -> Conflict a,es - | Complete (a,r) -> let es = decideComplete a r state in Accepted, es + | CommenceResult.Accepted, es -> Accepted, es + | CommenceResult.Conflict a, es -> Conflict a, es + | Complete (a, r) -> let es = decideComplete a r state in Accepted, es let [] properties c1 c2 = - let res,events = execute c1 Folds.initial - let state1 = Folds.fold Folds.initial events + let res, events = execute c1 Fold.initial + let state1 = Fold.fold Fold.initial events match c1, res, events, state1 with - | Commence (a,c), Accepted, [Events.Commenced ({ allocationId = ea; cutoff = ec } as e)], state -> + | Commence (a, c), Accepted, [Events.Commenced ({ allocationId = ea; cutoff = ec } as e)], state -> test <@ a = ea && c = ec && state = Some e @> | Complete _, Accepted, [], None -> () // Non-applicable Complete requests are simply ignored | _, res, l, _ -> test <@ List.isEmpty l && res = Accepted @> - let res,events = execute c2 state1 - let state2 = Folds.fold state1 events + let res, events = execute c2 state1 + let state2 = Fold.fold state1 events match state1, c2, res, events, state2 with // As per above, normal commence - | None, Commence (a,c), Accepted, [Events.Commenced ({ allocationId = ea; cutoff = ec } as e)], state -> + | None, Commence (a, c), Accepted, [Events.Commenced ({ allocationId = ea; cutoff = ec } as e)], state -> test <@ a = ea && c = ec && state = Some e @> // Idempotent accept if same allocationId - | Some active as s1, Commence (a,_), Accepted, [], s2 -> + | Some active as s1, Commence (a, _), Accepted, [], s2 -> test <@ s1 = s2 && active.allocationId = a @> // Conflict reports owner allocator - | Some active as s1, Commence (a2,_), Conflict a1, [], s2 -> + | Some active as s1, Commence (a2, _), Conflict a1, [], s2 -> test <@ s1 = s2 && a2 <> a1 && a1 = active.allocationId @> // Correct complete for same allocator is accepted - | Some active, Complete (a,r), Accepted, [Events.Completed { allocationId = ea; reason = er }], None -> + | Some active, Complete (a, r), Accepted, [Events.Completed { allocationId = ea; reason = er }], None -> test <@ er = r && ea = a && active.allocationId = a @> // Completes not for the same allocator are ignored - | Some active as s1, Complete (a,_), Accepted, [], s2 -> + | Some active as s1, Complete (a, _), Accepted, [], s2 -> test <@ active.allocationId <> a && s2 = s1 @> | _, _, res, l, _ -> test <@ List.isEmpty l && res = Accepted @> let [] ``codec can roundtrip`` event = - let ee = Events.codec.Encode(None,event) + let ee = Events.codec.Encode(None, event) let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) test <@ Some event = Events.codec.TryDecode ie @> \ No newline at end of file diff --git a/equinox-fc/Domain.Tests/TicketListTests.fs b/equinox-fc/Domain.Tests/TicketListTests.fs index c2d8ea059..5932f8805 100644 --- a/equinox-fc/Domain.Tests/TicketListTests.fs +++ b/equinox-fc/Domain.Tests/TicketListTests.fs @@ -5,13 +5,13 @@ open Swensen.Unquote open TicketList let [] properties c1 c2 = - let events = interpret c1 Folds.initial - let state1 = Folds.fold Folds.initial events + let events = interpret c1 Fold.initial + let state1 = Fold.fold Fold.initial events match c1, events, state1 with // Empty request -> no Event - | (_,[]), [], state -> + | (_, []), [], state -> test <@ Set.isEmpty state @> - | (a,t), [Events.Allocated { allocatorId = ea; ticketIds = et }], state -> + | (a, t), [Events.Allocated { allocatorId = ea; ticketIds = et }], state -> test <@ a = ea @> test <@ state = set t @> test <@ state = set et @> @@ -19,17 +19,17 @@ let [] properties c1 c2 = test <@ List.isEmpty l @> let events = interpret c2 state1 - let state2 = Folds.fold state1 events - test <@ Folds.fold state2 [Folds.snapshot state2] = state2 @> + let state2 = Fold.fold state1 events + test <@ Fold.fold state2 [Fold.snapshot state2] = state2 @> match state1, c2, events, state2 with // Empty request -> no Event, same state - | s1, (_,[]), [], state -> + | s1, (_, []), [], state -> test <@ state = s1 @> // Redundant request -> No Event, same state - | s1, (_,t), [], _ -> + | s1, (_, t), [], _ -> test <@ Set.isSuperset s1 (set t) @> // Two consecutive commands should both manifest in the state - | s1, (a,t), [Events.Allocated { allocatorId = ea; ticketIds = et }], state -> + | s1, (a, t), [Events.Allocated { allocatorId = ea; ticketIds = et }], state -> test <@ a = ea @> let et = Set et test <@ Set.isSuperset (set t) et @> @@ -40,6 +40,6 @@ let [] properties c1 c2 = test <@ List.isEmpty l @> let [] ``codec can roundtrip`` event = - let ee = Events.codec.Encode(None,event) + let ee = Events.codec.Encode(None, event) let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) test <@ Some event = Events.codec.TryDecode ie @> \ No newline at end of file diff --git a/equinox-fc/Domain.Tests/TicketTests.fs b/equinox-fc/Domain.Tests/TicketTests.fs index 32179871a..ffb05d87f 100644 --- a/equinox-fc/Domain.Tests/TicketTests.fs +++ b/equinox-fc/Domain.Tests/TicketTests.fs @@ -3,7 +3,7 @@ module TicketTests open FsCheck.Xunit open Swensen.Unquote open Ticket -open Ticket.Folds +open Ticket.Fold /// We want to generate Allocate requests with and without the same listId in some cases let (|MaybeSameCommands|) = function @@ -14,9 +14,9 @@ let (|MaybeSameCommands|) = function /// Explicitly generate sequences with the same allocator running twice or three times let (|MaybeSameIds|) = function | Choice1Of4 a -> a, a, a - | Choice2Of4 (a,b) -> a, a, b - | Choice3Of4 (a,b) -> a, b, b - | Choice4Of4 (a,b,c) -> a, b, c + | Choice2Of4 (a, b) -> a, a, b + | Choice3Of4 (a, b) -> a, b, b + | Choice4Of4 (a, b, c) -> a, b, c let (|Invariants|) = function // Revokes always succeed iff Unallocated @@ -36,47 +36,47 @@ let (|ReservedCases|_|) allocator = function test <@ a = allocator @> Some () // Revokes not by the owner are reported as successful, but we force the real owner to do the real relinquish - | (Reserved by | Allocated(by,_)), Revoke, true, [], _ -> + | (Reserved by | Allocated(by, _)), Revoke, true, [], _ -> test <@ by <> allocator @> Some () // Revokes succeed iff by the owner - | (Reserved by | Allocated(by,_)), Revoke, true, [Events.Revoked], Unallocated -> + | (Reserved by | Allocated(by, _)), Revoke, true, [Events.Revoked], Unallocated -> test <@ by = allocator @> Some () // Reservations can transition to Allocations as long as it's the same Allocator requesting - | Reserved a, Allocate l, true, [Events.Allocated { allocatorId = ea; listId = el }], Allocated (sa,sl) -> + | Reserved a, Allocate l, true, [Events.Allocated { allocatorId = ea; listId = el }], Allocated (sa, sl) -> test <@ a = allocator && a = ea && a = sa && l = el && l = sl @> Some() | _ -> None -let [] properties (MaybeSameIds (a1,a2,a3)) (MaybeSameCommands (c1,c2,c3)) = - let res, events = decide a1 c1 Folds.initial - let state1 = Folds.fold Folds.initial events +let [] properties (MaybeSameIds (a1, a2, a3)) (MaybeSameCommands (c1, c2, c3)) = + let res, events = decide a1 c1 Fold.initial + let state1 = Fold.fold Fold.initial events - match Folds.initial, c1, res, events, state1 with + match Fold.initial, c1, res, events, state1 with | _, Reserve, true, [Events.Reserved { allocatorId = a }], Reserved sa -> test <@ a = a1 && sa = a1 @> | Invariants -> () let res, events = decide a2 c2 state1 - let state2 = Folds.fold state1 events + let state2 = Fold.fold state1 events match state1, c2, res, events, state2 with | ReservedCases a2 -> () | Invariants -> () let res, events = decide a3 c3 state2 - let state3 = Folds.fold state2 events + let state3 = Fold.fold state2 events match state2, c3, res, events, state3 with // Idempotent allocate ignore - | Allocated (a,l), Allocate l3, true, [], _ -> + | Allocated (a, l), Allocate l3, true, [], _ -> test <@ a = a3 && l = l3 @> // Allocated -> Revoked - | Allocated (a,_), Revoke, true, [Events.Revoked], Unallocated -> + | Allocated (a, _), Revoke, true, [Events.Revoked], Unallocated -> test <@ a = a3 @> | ReservedCases a3 -> () | Invariants -> () let [] ``codec can roundtrip`` event = - let ee = Events.codec.Encode(None,event) + let ee = Events.codec.Encode(None, event) let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) test <@ Some event = Events.codec.TryDecode ie @> \ No newline at end of file diff --git a/equinox-fc/Domain/Allocation.fs b/equinox-fc/Domain/Allocation.fs index 9b6f61b1e..9ce07e7d5 100644 --- a/equinox-fc/Domain/Allocation.fs +++ b/equinox-fc/Domain/Allocation.fs @@ -34,7 +34,7 @@ module Events = let [] categoryId = "Allocation" let (|AggregateId|) id = Equinox.AggregateId(categoryId, AllocationId.toString id) -module Folds = +module Fold = type State = NotStarted | Running of States | Canceling of States | Completed and States = @@ -59,7 +59,7 @@ module Folds = let withRevoked (ToSet xs) x = { withKnown xs x with reserved = Set.difference x.reserved xs } let withReleasing (ToSet xs) x ={ withKnown xs x with releasing = x.releasing |> Set.union xs } // TODO let withAssigned listId x = // TODO - let decided,remaining = x.assigning |> List.partition (fun x -> x.listId = listId) + let decided, remaining = x.assigning |> List.partition (fun x -> x.listId = listId) let xs = seq { for x in decided do yield! x.ticketIds } { withRevoked xs x with assigning = remaining } let initial = NotStarted @@ -121,17 +121,17 @@ type ProcessState = | Cancelling of toAssign : Events.Allocated list * toRelease : TicketId list | Completed static member FromFoldState = function - | Folds.NotStarted -> + | Fold.NotStarted -> NotStarted - | Folds.Running e -> + | Fold.Running e -> match Set.toList e.reserved, e.assigning, Set.toList e.releasing, Set.toList e.unknown with | res, [], [], [] -> Idle (reserved = res) | res, ass, rel, tor -> Running (reserved = res, toAssign = ass, toRelease = rel, toReserve = tor) - | Folds.Canceling e -> + | Fold.Canceling e -> Cancelling (toAssign = e.assigning, toRelease = [yield! e.reserved; yield! e.unknown; yield! e.releasing]) - | Folds.Completed -> + | Fold.Completed -> Completed /// Updates recording attained progress @@ -146,26 +146,26 @@ let (|SetEmpty|_|) s = if Set.isEmpty s then Some () else None /// Map processed work to associated events that are to be recorded in the stream let decideUpdate update state = - let owned (s : Folds.States) = Set.union s.releasing (set <| seq { yield! s.unknown; yield! s.reserved }) + let owned (s : Fold.States) = Set.union s.releasing (set <| seq { yield! s.unknown; yield! s.reserved }) match state, update with - | (Folds.Completed | Folds.NotStarted), (Failed _|Reserved _|Assigned _|Revoked _) as x -> + | (Fold.Completed | Fold.NotStarted), (Failed _|Reserved _|Assigned _|Revoked _) as x -> failwithf "Folds.Completed or NotStarted cannot handle (Failed|Revoked|Assigned) %A" x - | (Folds.Running s|Folds.Canceling s), Reserved (ToSet xs) -> + | (Fold.Running s|Fold.Canceling s), Reserved (ToSet xs) -> match set s.unknown |> Set.intersect xs with SetEmpty -> [] | changed -> [Events.Reserved { ticketIds = Set.toArray changed }] - | (Folds.Running s|Folds.Canceling s), Failed (ToSet xs) -> + | (Fold.Running s|Fold.Canceling s), Failed (ToSet xs) -> match owned s |> Set.intersect xs with SetEmpty -> [] | changed -> [Events.Failed { ticketIds = Set.toArray changed }] - | (Folds.Running s|Folds.Canceling s), Revoked (ToSet xs) -> + | (Fold.Running s|Fold.Canceling s), Revoked (ToSet xs) -> match owned s |> Set.intersect xs with SetEmpty -> [] | changed -> [Events.Revoked { ticketIds = Set.toArray changed }] - | (Folds.Running s|Folds.Canceling s), Assigned listId -> + | (Fold.Running s|Fold.Canceling s), Assigned listId -> if s.assigning |> List.exists (fun x -> x.listId = listId) then [Events.Assigned { listId = listId }] else [] /// Holds events accumulated from a series of decisions while also evolving the presented `state` to reflect the pended events type private Accumulator() = let acc = ResizeArray() - member __.Ingest state : 'res * Events.Event list -> 'res * Folds.State = function - | res, [] -> res,state - | res, [e] -> acc.Add e; res,Folds.evolve state e - | res, xs -> acc.AddRange xs; res,Folds.fold state (Seq.ofList xs) + member __.Ingest state : 'res * Events.Event list -> 'res * Fold.State = function + | res, [] -> res, state + | res, [e] -> acc.Add e; res, Fold.evolve state e + | res, xs -> acc.AddRange xs; res, Fold.fold state (Seq.ofList xs) member __.Accumulated = List.ofSeq acc /// Impetus provided to the Aggregate Service from the Process Manager @@ -175,78 +175,79 @@ type Command = | Cancel /// Apply updates, decide whether Command is applicable, emit state reflecting work to be completed to conclude the in-progress workflow (if any) -let sync (updates : Update seq, command : Command) (state : Folds.State) : (bool*ProcessState) * Events.Event list = +let sync (updates : Update seq, command : Command) (state : Fold.State) : (bool*ProcessState) * Events.Event list = let acc = Accumulator() (* Apply any updates *) let mutable state = state for x in updates do - let (),state' = acc.Ingest state ((),decideUpdate x state) + let (), state' = acc.Ingest state ((), decideUpdate x state) state <- state' (* Decide whether the Command is now acceptable *) - let accepted,state = + let accepted, state = acc.Ingest state <| match state, command with (* Ignore on the basis of being idempotent in the face of retries *) // TOCONSIDER how to represent that a request is being denied e.g. due to timeout vs due to being complete - | (Folds.Idle|Folds.Releasing _), Apply _ -> + | (Fold.Idle|Fold.Releasing _), Apply _ -> false, [] (* Defer; Need to allow current request to progress before it can be considered *) - | (Folds.Acquiring _|Folds.Releasing _), Commence _ -> + | (Fold.Acquiring _|Fold.Releasing _), Commence _ -> true, [] // TODO validate idempotent ? (* Ok on the basis of idempotency *) - | (Folds.Idle|Folds.Releasing _), Cancel -> + | (Fold.Idle|Fold.Releasing _), Cancel -> true, [] (* Ok; Currently idle, normal Commence request*) - | Folds.Idle, Commence tickets -> - true,[Events.Commenced { ticketIds = Array.ofList tickets }] + | Fold.Idle, Commence tickets -> + true, [Events.Commenced { ticketIds = Array.ofList tickets }] (* Ok; normal apply to distribute held tickets *) - | Folds.Acquiring s, Apply (assign,release) -> + | Fold.Acquiring s, Apply (assign, release) -> let avail = System.Collections.Generic.HashSet s.reserved let toAssign = [for a in assign -> { a with ticketIds = a.ticketIds |> Array.where avail.Remove }] - let toRelease = (Set.empty,release) ||> List.fold (fun s x -> if avail.Remove x then Set.add x s else s) + let toRelease = (Set.empty, release) ||> List.fold (fun s x -> if avail.Remove x then Set.add x s else s) true, [ for x in toAssign do if (not << Array.isEmpty) x.ticketIds then yield Events.Allocated x match toRelease with SetEmpty -> () | toRelease -> yield Events.Released { ticketIds = Set.toArray toRelease }] (* Ok, normal Cancel *) - | Folds.Acquiring _, Cancel -> + | Fold.Acquiring _, Cancel -> true, [Events.Cancelled] (* Yield outstanding processing requirements (if any), together with events accumulated based on the `updates` *) (accepted, ProcessState.FromFoldState state), acc.Accumulated -type Service internal (resolve, ?maxAttempts) = +type Service internal (log, resolve, ?maxAttempts) = - let log = Serilog.Log.ForContext() - let (|Stream|) (Events.AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 3) + let resolve (Events.AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 3) - member __.Sync(allocationId,updates,command) : Async = - let (Stream stream) = allocationId - stream.Transact(sync (updates,command)) + member __.Sync(allocationId, updates, command) : Async = + let stream = resolve allocationId + stream.Transact(sync (updates, command)) + +let create resolve = Service(Serilog.Log.ForContext(), resolve, maxAttempts = 3) module EventStore = open Equinox.EventStore - let resolve (context,cache) = + let resolve (context, cache) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) // while there are competing writers [which might cause us to have to retry a Transact], this should be infrequent let opt = Equinox.ResolveOption.AllowStale // We should be reaching Completed state frequently so no actual Snapshots should get written - fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy).Resolve(id,opt) - let create (context,cache) = - Service(resolve (context,cache)) + fun id -> Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy).Resolve(id, opt) + let create (context, cache) = + create (resolve (context, cache)) module Cosmos = open Equinox.Cosmos - let resolve (context,cache) = + let resolve (context, cache) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) // while there are competing writers [which might cause us to have to retry a Transact], this should be infrequent let opt = Equinox.ResolveOption.AllowStale // TODO impl snapshots - let makeEmptyUnfolds events _state = events,[] - let accessStrategy = AccessStrategy.Custom (Folds.isOrigin,makeEmptyUnfolds) - fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, accessStrategy).Resolve(id,opt) - let create (context,cache) = - Service(resolve (context,cache)) \ No newline at end of file + let makeEmptyUnfolds events _state = events, [] + let accessStrategy = AccessStrategy.Custom (Fold.isOrigin, makeEmptyUnfolds) + fun id -> Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve(id, opt) + let create (context, cache) = + create (resolve (context, cache)) \ No newline at end of file diff --git a/equinox-fc/Domain/Allocator.fs b/equinox-fc/Domain/Allocator.fs index 50037d911..0ea9dcfd1 100644 --- a/equinox-fc/Domain/Allocator.fs +++ b/equinox-fc/Domain/Allocator.fs @@ -15,10 +15,10 @@ module Events = | Completed of Completed interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.NewtonsoftJson.Codec.Create() - let [] categoryId = "Allocator" - let (|AggregateId|) id = Equinox.AggregateId(categoryId, AllocatorId.toString id) + let [] category = "Allocator" + let (|For|) id = Equinox.AggregateId(category, AllocatorId.toString id) -module Folds = +module Fold = type State = Events.Commenced option let initial = None @@ -29,46 +29,47 @@ module Folds = type CommenceResult = Accepted | Conflict of AllocationId -let decideCommence allocationId cutoff : Folds.State -> CommenceResult*Events.Event list = function +let decideCommence allocationId cutoff : Fold.State -> CommenceResult*Events.Event list = function | None -> Accepted, [Events.Commenced { allocationId = allocationId; cutoff = cutoff }] | Some { allocationId = tid } when allocationId = tid -> Accepted, [] // Accept replay idempotently | Some curr -> Conflict curr.allocationId, [] // Reject attempts at commencing overlapping transactions -let decideComplete allocationId reason : Folds.State -> Events.Event list = function +let decideComplete allocationId reason : Fold.State -> Events.Event list = function | Some { allocationId = tid } when allocationId = tid -> [Events.Completed { allocationId = allocationId; reason = reason }] | Some _ | None -> [] // Assume replay; accept but don't write -type Service internal (resolve, ?maxAttempts) = +type Service internal (log, resolve, maxAttempts) = - let log = Serilog.Log.ForContext() - let (|Stream|) (Events.AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 3) + let resolve (Events.For id) = Equinox.Stream(log, resolve id, maxAttempts) member __.Commence(allocatorId, allocationId, cutoff) : Async = - let (Stream stream) = allocatorId + let stream = resolve allocatorId stream.Transact(decideCommence allocationId cutoff) member __.Complete(allocatorId, allocationId, reason) : Async = - let (Stream stream) = allocatorId + let stream = resolve allocatorId stream.Transact(decideComplete allocationId reason) +let create resolve = Service(Serilog.Log.ForContext(), resolve, 3) + module EventStore = open Equinox.EventStore - let resolve (context,cache) = + let resolve (context, cache) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) // while there are competing writers [which might cause us to have to retry a Transact], this should be infrequent let opt = Equinox.ResolveOption.AllowStale - fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve(id,opt) - let create (context,cache) = - Service(resolve (context,cache)) + fun id -> Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve(id, opt) + let create (context, cache) = + create (resolve (context, cache)) module Cosmos = open Equinox.Cosmos - let resolve (context,cache) = + let resolve (context, cache) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) // while there are competing writers [which might cause us to have to retry a Transact], this should be infrequent let opt = Equinox.ResolveOption.AllowStale - fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve(id,opt) - let create (context,cache) = - Service(resolve (context,cache)) \ No newline at end of file + fun id -> Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve(id, opt) + let create (context, cache) = + create (resolve (context, cache)) \ No newline at end of file diff --git a/equinox-fc/Domain/Ticket.fs b/equinox-fc/Domain/Ticket.fs index f4f0957a9..ebe74279e 100644 --- a/equinox-fc/Domain/Ticket.fs +++ b/equinox-fc/Domain/Ticket.fs @@ -12,10 +12,10 @@ module Events = | Revoked interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.NewtonsoftJson.Codec.Create() - let [] categoryId = "Ticket" - let (|AggregateId|) id = Equinox.AggregateId(categoryId, TicketId.toString id) + let [] category = "Ticket" + let (|For|) id = Equinox.AggregateId(category, TicketId.toString id) -module Folds = +module Fold = type State = Unallocated | Reserved of by : AllocatorId | Allocated of by : AllocatorId * on : TicketListId let initial = Unallocated @@ -36,46 +36,47 @@ type Command = /// (but are not failures from an Allocator's perspective) | Revoke -let decide (allocator : AllocatorId) (command : Command) (state : Folds.State) : bool * Events.Event list = +let decide (allocator : AllocatorId) (command : Command) (state : Fold.State) : bool * Events.Event list = match command, state with - | Reserve, Folds.Unallocated -> true,[Events.Reserved { allocatorId = allocator }] // normal case -> allow+record - | Reserve, Folds.Reserved by when by = allocator -> true,[] // idempotently permit - | Reserve, (Folds.Reserved _ | Folds.Allocated _) -> false,[] // report failure, nothing to write - | Allocate list, Folds.Allocated (by,l) when by = allocator && l = list -> true,[] // idempotent processing - | Allocate list, Folds.Reserved by when by = allocator -> true,[Events.Allocated { allocatorId = allocator; listId = list }] // normal - | Allocate _, (Folds.Allocated _ | Folds.Unallocated | Folds.Reserved _) -> false,[] // Fail if someone else has reserved or allocated, or we are jumping straight to Allocated without Reserving first - | Revoke, Folds.Unallocated -> true,[] // idempotent handling - | Revoke, (Folds.Reserved by | Folds.Allocated (by,_)) when by = allocator -> true,[Events.Revoked] // release Reservation or Allocation - | Revoke, (Folds.Reserved _ | Folds.Allocated _ ) -> true,[] // NOTE we report success of achieving the intent (but, critically, we leave it to the actual owner to manage any actual revoke) + | Reserve, Fold.Unallocated -> true, [Events.Reserved { allocatorId = allocator }] // normal case -> allow+record + | Reserve, Fold.Reserved by when by = allocator -> true, [] // idempotently permit + | Reserve, (Fold.Reserved _ | Fold.Allocated _) -> false, [] // report failure, nothing to write + | Allocate list, Fold.Allocated (by, l) when by = allocator && l = list -> true, [] // idempotent processing + | Allocate list, Fold.Reserved by when by = allocator -> true, [Events.Allocated { allocatorId = allocator; listId = list }] // normal + | Allocate _, (Fold.Allocated _ | Fold.Unallocated | Fold.Reserved _) -> false, [] // Fail if someone else has reserved or allocated, or we are jumping straight to Allocated without Reserving first + | Revoke, Fold.Unallocated -> true, [] // idempotent handling + | Revoke, (Fold.Reserved by | Fold.Allocated (by, _)) when by = allocator -> true, [Events.Revoked] // release Reservation or Allocation + | Revoke, (Fold.Reserved _ | Fold.Allocated _ ) -> true, [] // NOTE we report success of achieving the intent (but, critically, we leave it to the actual owner to manage any actual revoke) -type Service internal (resolve, ?maxAttempts) = +type Service internal (log, resolve, maxAttempts) = - let log = Serilog.Log.ForContext() - let (|Stream|) (Events.AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 3) + let resolve (Events.For id) = Equinox.Stream(log, resolve id, maxAttempts) /// Attempts to achieve the intent represented by `command`. High level semantics as per comments on Command (see decide for lowdown) /// `false` is returned if a competing allocator holds it (or we're attempting to jump straight to Allocated without first Reserving) member __.Sync(pickTicketId, allocator, command : Command) : Async = - let (Stream stream) = pickTicketId + let stream = resolve pickTicketId stream.Transact(decide allocator command) +let create resolve = Service(Serilog.Log.ForContext(), resolve, 3) + module EventStore = open Equinox.EventStore - let resolve (context,cache) = + let resolve (context, cache) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) // because we only ever need the last event, we use the Equinox.EventStore access strategy that optimizes around that - Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve - let create (context,cache)= - Service(resolve (context,cache)) + Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve + let create (context, cache) = + create (resolve (context, cache)) module Cosmos = open Equinox.Cosmos - let resolve (context,cache) = + let resolve (context, cache) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) // because we only ever need the last event to build the state, we feed the events we are writing // (there's always exactly one if we are writing), into the unfolds slot so a single point read with etag check gets us state in one trip - Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve - let create (context,cache) = - Service(resolve (context,cache)) \ No newline at end of file + Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve + let create (context, cache) = + create(resolve (context, cache)) \ No newline at end of file diff --git a/equinox-fc/Domain/TicketList.fs b/equinox-fc/Domain/TicketList.fs index 348e07384..a21e8265e 100644 --- a/equinox-fc/Domain/TicketList.fs +++ b/equinox-fc/Domain/TicketList.fs @@ -13,53 +13,54 @@ module Events = let [] categoryId = "TicketList" let (|AggregateId|) id = Equinox.AggregateId(categoryId, TicketListId.toString id) -module Folds = +module Fold = type State = Set let initial = Set.empty let evolve state = function - | Events.Allocated e -> (state,e.ticketIds) ||> Array.fold (fun m x -> Set.add x m) + | Events.Allocated e -> (state, e.ticketIds) ||> Array.fold (fun m x -> Set.add x m) | Events.Snapshotted e -> Set.ofArray e.ticketIds let fold : State -> Events.Event seq -> State = Seq.fold evolve let isOrigin = function Events.Snapshotted _ -> true | Events.Allocated _ -> false let snapshot state = Events.Snapshotted { ticketIds = Set.toArray state } -let interpret (allocatorId : AllocatorId, allocated : TicketId list) (state : Folds.State) : Events.Event list = +let interpret (allocatorId : AllocatorId, allocated : TicketId list) (state : Fold.State) : Events.Event list = match allocated |> Seq.except state |> Seq.distinct |> Seq.toArray with | [||] -> [] | news -> [Events.Allocated { allocatorId = allocatorId; ticketIds = news }] -type Service internal (resolve, ?maxAttempts) = +type Service internal (log, resolve, maxAttempts) = - let log = Serilog.Log.ForContext() - let (|Stream|) (Events.AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 3) + let resolve (Events.AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts) - member __.Sync(pickListId,allocatorId,assignedTickets) : Async = - let (Stream stream) = pickListId - stream.Transact(interpret (allocatorId,assignedTickets)) + member __.Sync(pickListId, allocatorId, assignedTickets) : Async = + let stream = resolve pickListId + stream.Transact(interpret (allocatorId, assignedTickets)) + +let create resolve = Service(Serilog.Log.ForContext(), resolve, maxAttempts = 3) module EventStore = open Equinox.EventStore - let resolve (context,cache) = + let resolve (context, cache) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) // while there are competing writers (which might cause us to have to retry a Transact and discover it is redundant), there is never a cost to being wrong let opt = Equinox.ResolveOption.AllowStale // we _could_ use this Access Strategy, but because we are only generally doing a single shot write, its unwarranted - // let accessStrategy = AccessStrategy.RollingSnapshots (Folds.isOrigin,Folds.snapshot) - fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy).Resolve(id,opt) - let create (context,cache) = - Service(resolve (context,cache)) + // let accessStrategy = AccessStrategy.RollingSnapshots (Folds.isOrigin, Folds.snapshot) + fun id -> Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy).Resolve(id, opt) + let create (context, cache) = + create (resolve (context, cache)) module Cosmos = open Equinox.Cosmos - let resolve (context,cache) = + let resolve (context, cache) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) // while there are competing writers (which might cause us to have to retry a Transact and discover it is redundant), there is never a cost to being wrong let opt = Equinox.ResolveOption.AllowStale // we want reads and writes (esp idempotent ones) to have optimal RU efficiency so we go the extra mile to do snapshotting into the Tip - let accessStrategy = AccessStrategy.Snapshot (Folds.isOrigin,Folds.snapshot) - fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, accessStrategy).Resolve(id,opt) - let create (context,cache)= - Service(resolve (context,cache)) \ No newline at end of file + let accessStrategy = AccessStrategy.Snapshot (Fold.isOrigin, Fold.snapshot) + fun id -> Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve(id, opt) + let create (context, cache)= + create (resolve (context, cache)) \ No newline at end of file