Skip to content

Commit cc5330e

Browse files
authored
Adding mutex locks in cache to prevent race conditions (#185)
1 parent 5f7ccd5 commit cc5330e

File tree

5 files changed

+237
-120
lines changed

5 files changed

+237
-120
lines changed

internal/reporter/downstream_reporter/cache/cache.go

Lines changed: 166 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -3,105 +3,116 @@ package cache
33
import (
44
b64 "encoding/base64"
55
"errors"
6-
"reflect"
7-
"soarca/logger"
6+
"fmt"
7+
"slices"
88
"soarca/models/cacao"
99
cache_report "soarca/models/cache"
1010
itime "soarca/utils/time"
11+
"sync"
1112
"time"
1213

1314
"github.com/google/uuid"
1415
)
1516

16-
var component = reflect.TypeOf(Cache{}).PkgPath()
17-
var log *logger.Log
18-
19-
func init() {
20-
log = logger.Logger(component, logger.Info, "", logger.Json)
21-
}
22-
2317
const MaxExecutions int = 10
2418

2519
type Cache struct {
2620
Size int
2721
timeUtil itime.ITime
2822
Cache map[string]cache_report.ExecutionEntry // Cached up to max
2923
fifoRegister []string // Used for O(1) FIFO cache management
24+
mutex sync.Mutex
3025
}
3126

3227
func New(timeUtil itime.ITime, maxExecutions int) *Cache {
3328
return &Cache{
3429
Size: maxExecutions,
3530
Cache: make(map[string]cache_report.ExecutionEntry),
3631
timeUtil: timeUtil,
32+
mutex: sync.Mutex{},
3733
}
3834
}
3935

36+
// ############################### Atomic cache access operations (mutex-protection)
37+
38+
func (cacheReporter *Cache) getAllExecutions() ([]cache_report.ExecutionEntry, error) {
39+
executions := make([]cache_report.ExecutionEntry, 0)
40+
// NOTE: fetched via fifo register key reference as is ordered array,
41+
// this is needed to test and report back ordered executions stored
42+
43+
// Lock
44+
cacheReporter.mutex.Lock()
45+
defer cacheReporter.mutex.Unlock()
46+
for _, executionEntryKey := range cacheReporter.fifoRegister {
47+
// NOTE: cached executions are passed by reference, so they must not be modified
48+
entry, ok := cacheReporter.Cache[executionEntryKey]
49+
if !ok {
50+
// Unlock
51+
return []cache_report.ExecutionEntry{}, errors.New("internal error. cache fifo register and cache executions mismatch")
52+
}
53+
executions = append(executions, entry)
54+
}
55+
56+
// Unlocked
57+
return executions, nil
58+
}
59+
4060
func (cacheReporter *Cache) getExecution(executionKey uuid.UUID) (cache_report.ExecutionEntry, error) {
61+
4162
executionKeyStr := executionKey.String()
63+
// No need for mutex as is one-line access
4264
executionEntry, ok := cacheReporter.Cache[executionKeyStr]
65+
4366
if !ok {
44-
err := errors.New("execution is not in cache")
45-
log.Warning("execution is not in cache. consider increasing cache size.")
67+
err := errors.New("execution is not in cache. consider increasing cache size")
4668
return cache_report.ExecutionEntry{}, err
47-
// TODO Retrieve from database
69+
// TODO Retrieve from database and push to cache
4870
}
4971
return executionEntry, nil
5072
}
51-
func (cacheReporter *Cache) getExecutionStep(executionKey uuid.UUID, stepKey string) (cache_report.StepResult, error) {
52-
executionEntry, err := cacheReporter.getExecution(executionKey)
53-
if err != nil {
54-
return cache_report.StepResult{}, err
55-
}
56-
executionStep, ok := executionEntry.StepResults[stepKey]
57-
if !ok {
58-
err := errors.New("execution step is not in cache")
59-
return cache_report.StepResult{}, err
60-
}
61-
return executionStep, nil
62-
}
6373

6474
// Adding executions in FIFO logic
65-
func (cacheReporter *Cache) addExecution(newExecutionEntry cache_report.ExecutionEntry) error {
75+
func (cacheReporter *Cache) addExecutionFIFO(newExecutionEntry cache_report.ExecutionEntry) error {
6676

6777
if !(len(cacheReporter.fifoRegister) == len(cacheReporter.Cache)) {
6878
return errors.New("cache fifo register and content are desynchronized")
6979
}
7080

7181
newExecutionEntryKey := newExecutionEntry.ExecutionId.String()
7282

83+
// Lock
84+
cacheReporter.mutex.Lock()
85+
defer cacheReporter.mutex.Unlock()
86+
87+
if _, ok := cacheReporter.Cache[newExecutionEntryKey]; ok {
88+
return errors.New("there is already an execution in the cache with the same execution id")
89+
}
7390
if len(cacheReporter.fifoRegister) >= cacheReporter.Size {
91+
7492
firstExecution := cacheReporter.fifoRegister[0]
7593
cacheReporter.fifoRegister = cacheReporter.fifoRegister[1:]
7694
delete(cacheReporter.Cache, firstExecution)
77-
7895
cacheReporter.fifoRegister = append(cacheReporter.fifoRegister, newExecutionEntryKey)
7996
cacheReporter.Cache[newExecutionEntryKey] = newExecutionEntry
97+
8098
return nil
99+
// Unlocked
81100
}
82-
83101
cacheReporter.fifoRegister = append(cacheReporter.fifoRegister, newExecutionEntryKey)
84102
cacheReporter.Cache[newExecutionEntryKey] = newExecutionEntry
85-
return nil
86-
}
87103

88-
func (cacheReporter *Cache) ReportWorkflowStart(executionId uuid.UUID, playbook cacao.Playbook) error {
89-
newExecutionEntry := cache_report.ExecutionEntry{
90-
ExecutionId: executionId,
91-
PlaybookId: playbook.ID,
92-
Started: cacheReporter.timeUtil.Now(),
93-
Ended: time.Time{},
94-
StepResults: map[string]cache_report.StepResult{},
95-
Status: cache_report.Ongoing,
96-
}
97-
err := cacheReporter.addExecution(newExecutionEntry)
98-
if err != nil {
99-
return err
100-
}
101104
return nil
105+
// Unlocked
102106
}
103107

104-
func (cacheReporter *Cache) ReportWorkflowEnd(executionId uuid.UUID, playbook cacao.Playbook, workflowError error) error {
108+
func (cacheReporter *Cache) upateEndExecutionWorkflow(executionId uuid.UUID, workflowError error) error {
109+
// The cache should stay locked for the whole modification period
110+
// in order to prevent e.g. the execution data being popped-out due to FIFO
111+
// while its status or some of its steps are being updated
112+
113+
// Lock
114+
cacheReporter.mutex.Lock()
115+
defer cacheReporter.mutex.Unlock()
105116

106117
executionEntry, err := cacheReporter.getExecution(executionId)
107118
if err != nil {
@@ -118,70 +129,64 @@ func (cacheReporter *Cache) ReportWorkflowEnd(executionId uuid.UUID, playbook ca
118129
cacheReporter.Cache[executionId.String()] = executionEntry
119130

120131
return nil
132+
// Unlocked
121133
}
122134

123-
func (cacheReporter *Cache) ReportStepStart(executionId uuid.UUID, step cacao.Step, variables cacao.Variables) error {
135+
func (cacheReporter *Cache) addStartExecutionStep(executionId uuid.UUID, newStepData cache_report.StepResult) error {
136+
// Locked
137+
cacheReporter.mutex.Lock()
138+
defer cacheReporter.mutex.Unlock()
139+
124140
executionEntry, err := cacheReporter.getExecution(executionId)
125141
if err != nil {
126142
return err
127143
}
128144

129145
if executionEntry.Status != cache_report.Ongoing {
130-
return errors.New("trying to report on the execution of a step for an already reported completed or failed execution")
146+
return errors.New("trying to report on the execution of a step for an already reportedly terminated playbook execution")
131147
}
132-
133-
_, alreadyThere := executionEntry.StepResults[step.ID]
148+
_, alreadyThere := executionEntry.StepResults[newStepData.StepId]
134149
if alreadyThere {
135-
log.Warning("a step execution was already reported for this step. overwriting.")
150+
// TODO: must fix: all steps should start empty values but already present. Check should be
151+
// done on Step.Started > 0 time
152+
//
153+
// Should divide between instanciation of step, and modification of step,
154+
// with respective checks step status
155+
return errors.New("a step execution start was already reported for this step. ignoring")
136156
}
137157

138-
// TODO: must test
139-
commandsB64 := []string{}
140-
isAutomated := true
141-
for _, cmd := range step.Commands {
142-
if cmd.Type == cacao.CommandTypeManual {
143-
isAutomated = false
144-
}
145-
if cmd.CommandB64 != "" {
146-
commandsB64 = append(commandsB64, cmd.CommandB64)
147-
} else {
148-
cmdB64 := b64.StdEncoding.EncodeToString([]byte(cmd.Command))
149-
commandsB64 = append(commandsB64, cmdB64)
150-
}
151-
}
158+
executionEntry.StepResults[newStepData.StepId] = newStepData
159+
// New code
160+
cacheReporter.Cache[executionId.String()] = executionEntry
152161

153-
newStepEntry := cache_report.StepResult{
154-
ExecutionId: executionId,
155-
StepId: step.ID,
156-
Started: cacheReporter.timeUtil.Now(),
157-
Ended: time.Time{},
158-
Variables: variables,
159-
CommandsB64: commandsB64,
160-
Status: cache_report.Ongoing,
161-
Error: nil,
162-
IsAutomated: isAutomated,
163-
}
164-
executionEntry.StepResults[step.ID] = newStepEntry
165162
return nil
163+
// Unlocked
166164
}
167165

168-
func (cacheReporter *Cache) ReportStepEnd(executionId uuid.UUID, step cacao.Step, returnVars cacao.Variables, stepError error) error {
166+
func (cacheReporter *Cache) upateEndExecutionStep(executionId uuid.UUID, stepId string, returnVars cacao.Variables, stepError error, acceptedStepStati []cache_report.Status) error {
167+
// Locked
168+
cacheReporter.mutex.Lock()
169+
defer cacheReporter.mutex.Unlock()
170+
169171
executionEntry, err := cacheReporter.getExecution(executionId)
170172
if err != nil {
171173
return err
172174
}
173175

174176
if executionEntry.Status != cache_report.Ongoing {
175-
return errors.New("trying to report on the execution of a step for an already reported completed or failed execution")
177+
return errors.New("trying to report on the execution of a step for an already reportedly terminated playbook execution")
178+
// Unlocked
176179
}
177-
178-
executionStepResult, err := cacheReporter.getExecutionStep(executionId, step.ID)
179-
if err != nil {
180-
return err
180+
executionStepResult, ok := executionEntry.StepResults[stepId]
181+
if !ok {
182+
// TODO: must fix: all steps should start empty values but already present. Check should be
183+
// done on Step.Started > 0 time
184+
return errors.New("trying to update a step which was not (yet?) recorded in the cache")
185+
// Unlocked
181186
}
182187

183-
if executionStepResult.Status != cache_report.Ongoing {
184-
return errors.New("trying to report on the execution of a step that was already reported completed or failed")
188+
if !slices.Contains(acceptedStepStati, executionStepResult.Status) {
189+
return fmt.Errorf("step status precondition not met for step update [step status: %s]", executionStepResult.Status.String())
185190
}
186191

187192
if stepError != nil {
@@ -192,27 +197,22 @@ func (cacheReporter *Cache) ReportStepEnd(executionId uuid.UUID, step cacao.Step
192197
}
193198
executionStepResult.Ended = cacheReporter.timeUtil.Now()
194199
executionStepResult.Variables = returnVars
195-
executionEntry.StepResults[step.ID] = executionStepResult
200+
executionEntry.StepResults[stepId] = executionStepResult
201+
cacheReporter.Cache[executionId.String()] = executionEntry
196202

197203
return nil
204+
// Unlocked
198205
}
199206

207+
// ############################### Informer interface
208+
200209
func (cacheReporter *Cache) GetExecutions() ([]cache_report.ExecutionEntry, error) {
201-
executions := make([]cache_report.ExecutionEntry, 0)
202-
// NOTE: fetched via fifo register key reference as is ordered array,
203-
// needed to test and report back ordered executions stored
204-
for _, executionEntryKey := range cacheReporter.fifoRegister {
205-
// NOTE: cached executions are passed by reference, so they must not be modified
206-
entry, present := cacheReporter.Cache[executionEntryKey]
207-
if !present {
208-
return []cache_report.ExecutionEntry{}, errors.New("internal error. cache fifo register and cache executions mismatch.")
209-
}
210-
executions = append(executions, entry)
211-
}
212-
return executions, nil
210+
executions, err := cacheReporter.getAllExecutions()
211+
return executions, err
213212
}
214213

215214
func (cacheReporter *Cache) GetExecutionReport(executionKey uuid.UUID) (cache_report.ExecutionEntry, error) {
215+
216216
executionEntry, err := cacheReporter.getExecution(executionKey)
217217
if err != nil {
218218
return cache_report.ExecutionEntry{}, err
@@ -221,3 +221,74 @@ func (cacheReporter *Cache) GetExecutionReport(executionKey uuid.UUID) (cache_re
221221

222222
return report, nil
223223
}
224+
225+
// ############################### Reporting interface
226+
227+
func (cacheReporter *Cache) ReportWorkflowStart(executionId uuid.UUID, playbook cacao.Playbook) error {
228+
229+
newExecutionEntry := cache_report.ExecutionEntry{
230+
ExecutionId: executionId,
231+
PlaybookId: playbook.ID,
232+
Started: cacheReporter.timeUtil.Now(),
233+
Ended: time.Time{},
234+
StepResults: map[string]cache_report.StepResult{},
235+
Status: cache_report.Ongoing,
236+
}
237+
err := cacheReporter.addExecutionFIFO(newExecutionEntry)
238+
if err != nil {
239+
return err
240+
}
241+
return nil
242+
}
243+
244+
func (cacheReporter *Cache) ReportWorkflowEnd(executionId uuid.UUID, playbook cacao.Playbook, workflowError error) error {
245+
246+
err := cacheReporter.upateEndExecutionWorkflow(executionId, workflowError)
247+
return err
248+
}
249+
250+
func (cacheReporter *Cache) ReportStepStart(executionId uuid.UUID, step cacao.Step, variables cacao.Variables) error {
251+
252+
commandsB64 := []string{}
253+
isAutomated := true
254+
for _, cmd := range step.Commands {
255+
if cmd.Type == cacao.CommandTypeManual {
256+
isAutomated = false
257+
}
258+
if cmd.CommandB64 != "" {
259+
commandsB64 = append(commandsB64, cmd.CommandB64)
260+
} else {
261+
cmdB64 := b64.StdEncoding.EncodeToString([]byte(cmd.Command))
262+
commandsB64 = append(commandsB64, cmdB64)
263+
}
264+
}
265+
266+
newStep := cache_report.StepResult{
267+
ExecutionId: executionId,
268+
StepId: step.ID,
269+
Started: cacheReporter.timeUtil.Now(),
270+
Ended: time.Time{},
271+
Variables: variables,
272+
CommandsB64: commandsB64,
273+
Status: cache_report.Ongoing,
274+
Error: nil,
275+
IsAutomated: isAutomated,
276+
}
277+
278+
err := cacheReporter.addStartExecutionStep(executionId, newStep)
279+
280+
return err
281+
}
282+
283+
func (cacheReporter *Cache) ReportStepEnd(executionId uuid.UUID, step cacao.Step, returnVars cacao.Variables, stepError error) error {
284+
285+
// stepId, err := uuid.Parse(step.ID)
286+
// if err != nil {
287+
// return fmt.Errorf("could not parse to uuid the step id: %s", step.ID)
288+
// }
289+
290+
acceptedStepStati := []cache_report.Status{cache_report.Ongoing}
291+
err := cacheReporter.upateEndExecutionStep(executionId, step.ID, returnVars, stepError, acceptedStepStati)
292+
293+
return err
294+
}

0 commit comments

Comments
 (0)