-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathAzureTableJoinStorage.fs
More file actions
121 lines (105 loc) · 5.9 KB
/
AzureTableJoinStorage.fs
File metadata and controls
121 lines (105 loc) · 5.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// Copyright (c) Microsoft Corporation.
/// An Azure Table implementation of the Agent Join storage
/// This can be used to run agent state machines with fork and join
/// backed up by an Azure Table.
module Microsoft.FSharpLu.Actor.AzureTableJoinStorage
open Microsoft.Azure.Cosmos
open Microsoft.FSharpLu.Actor.StateMachine
open Microsoft.FSharpLu.Actor.StateMachine.Agent
open Microsoft.FSharpLu
open Microsoft.FSharpLu.ErrorHandling
open Microsoft.FSharpLu.Azure.Request.ErrorHandling
open System.Collections.Generic
/// Instantiate an implementation of Agent.Join.StorageInterface
/// backed up by an Azure Table
let newStorage<'m>
(storage: Table.CloudStorageAccount)
(tableName: string)
(retryInterval: System.TimeSpan)
(totalTimeout: System.TimeSpan)
(agentId: string) /// Customer identifier that gets recorded in every entry in Azure Table
: Async<Agent.Join.IStorage<'m>> =
async {
let tableClient = Table.CloudTableClient(storage.TableStorageUri, storage.Credentials)
let table = tableClient.GetTableReference(tableName)
let! _ = table.CreateIfNotExistsAsync() |> Async.AwaitTask
let retrieve (joinId:JoinId) =
async {
let partitionKey = joinId.timestamp.Ticks.ToString("D19")
let rowKey = joinId.guid.ToString()
let retrieve = Table.TableOperation.Retrieve(partitionKey, rowKey)
let! result = table.ExecuteAsync(retrieve) |> Async.AwaitTask
match Option.ofObj result.Result with
| None -> return failwithf "Could not find entry with id %A" joinId
| Some r -> return r :?> Table.DynamicTableEntity
}
let entityValuesToJoinEntry (properties: IDictionary<string, Table.EntityProperty>): Agent.Join.Entry<'m> =
let inline deserialize (key: string): 'a =
properties.[key].StringValue |> Json.Default.deserialize<'a>
{
whenAllSubscribers = deserialize "whenAllSubscribers"
whenAnySubscribers = deserialize "whenAnySubscribers"
status = deserialize "status"
childrenStatuses = deserialize "childrenStatuses"
parent = deserialize "parent"
created = deserialize "created"
modified = deserialize "modified"
}
let joinEntryToEntityValues (joinEntry: Agent.Join.Entry<'m>) =
dict [
"parent", Table.EntityProperty(Json.Default.serialize joinEntry.parent)
"childrenStatuses", Table.EntityProperty(Json.Default.serialize joinEntry.childrenStatuses)
"status", Table.EntityProperty(Json.Default.serialize joinEntry.status)
"whenAnySubscribers", Table.EntityProperty(Json.Default.serialize joinEntry.whenAnySubscribers)
"whenAllSubscribers", Table.EntityProperty(Json.Default.serialize joinEntry.whenAllSubscribers)
"created", Table.EntityProperty(Json.Default.serialize joinEntry.created)
"modified", Table.EntityProperty(Json.Default.serialize joinEntry.modified)
"agentId", Table.EntityProperty(agentId)
]
return
{
new Agent.Join.IStorage<_> with
member __.add joinId joinEntry =
async {
let partitionKey = joinId.timestamp.Ticks.ToString("D19")
let rowKey = joinId.guid.ToString()
let values = joinEntryToEntityValues joinEntry
let entity = Table.DynamicTableEntity(partitionKey, rowKey, "*", values)
let tableOperation = Table.TableOperation.Insert(entity)
let! _ = table.ExecuteAsync(tableOperation) |> Async.AwaitTask
return ()
}
member __.update joinId doEntryUpdate =
let start = System.DateTime.UtcNow
let rec doUpdate () =
async {
let spent = System.DateTime.UtcNow - start
if spent > totalTimeout then
failwithf "Update: Timed out after trying for %A (totalTimeout: %A)" spent totalTimeout
let! r = retrieve joinId
let joinEntry = entityValuesToJoinEntry r.Properties
let updatedJoinEntry = doEntryUpdate joinEntry
let partitionKey = joinEntry.created.Ticks.ToString("D19")
let rowKey = joinId.guid.ToString()
let values = joinEntryToEntityValues updatedJoinEntry
let entity = Table.DynamicTableEntity(partitionKey, rowKey, r.ETag, values)
let tableOperation = Table.TableOperation.Replace(entity)
try
let! _ = table.ExecuteAsync(tableOperation) |> Async.AwaitTask
return updatedJoinEntry
with
| HttpCommunication.Client.TooManyRequestsException(msg) ->
do! Async.Sleep (int retryInterval.TotalMilliseconds)
return! doUpdate ()
| IsAggregateOf (SomeStorageException System.Net.HttpStatusCode.PreconditionFailed) e ->
do! Async.Sleep (int retryInterval.TotalMilliseconds)
return! doUpdate ()
}
doUpdate ()
member __.get joinId =
async {
let! r = retrieve joinId
return entityValuesToJoinEntry r.Properties
}
}
}