Skip to content

Commit 74b4a43

Browse files
authored
Final set of initial features for fuzzy checker (#43)
* Do initial inputs, get rid of various configurable todos * Cancel types for remote activities * Add nested actions / debug output / fix action probability clamping * Add continue as new
1 parent a515275 commit 74b4a43

File tree

10 files changed

+817
-699
lines changed

10 files changed

+817
-699
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,4 +133,5 @@ workflow that waits for a signal for a configurable amount of time.
133133

134134
## Fuzzer trophy case
135135

136-
* Python upsert SA with no initial attributes: [PR](https://github.com/temporalio/sdk-python/pull/440)
136+
* Python upsert SA with no initial attributes: [PR](https://github.com/temporalio/sdk-python/pull/440)
137+
* Core cancel-before-start on abandon activities: [PR](https://github.com/temporalio/sdk-core/pull/652)

loadgen/kitchen-sink-gen/src/main.rs

Lines changed: 97 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod protos;
22

3+
use crate::protos::temporal::omes::kitchen_sink::{ActivityCancellationType, ContinueAsNewAction};
34
use crate::protos::temporal::{
45
api::common::v1::{Memo, Payload, Payloads},
56
omes::kitchen_sink::{
@@ -72,6 +73,8 @@ struct GeneratorConfig {
7273
max_client_action_set_wait: Duration,
7374
/// How likely various actions are to be generated
7475
action_chances: ActionChances,
76+
/// Maximum number of initial actions in a workflow input
77+
max_initial_actions: usize,
7578
}
7679

7780
impl Default for GeneratorConfig {
@@ -84,6 +87,7 @@ impl Default for GeneratorConfig {
8487
max_payload_size: 256,
8588
max_client_action_set_wait: Duration::from_secs(1),
8689
action_chances: Default::default(),
90+
max_initial_actions: 10,
8791
}
8892
}
8993
}
@@ -99,12 +103,16 @@ struct ExampleCmd {
99103
clap::ArgGroup::new("output").args(&["use_stdout", "output_path"]),
100104
))]
101105
struct OutputConfig {
102-
/// Output goes to stdout as protobuf binary, this is the default.
106+
/// Output goes to stdout, this is the default.
103107
#[clap(long, default_value_t = true)]
104108
use_stdout: bool,
105109
/// Output goes to the provided file path as protobuf binary.
106110
#[clap(long)]
107111
output_path: Option<PathBuf>,
112+
/// Output will be in Rust debug format if set true. JSON is obnoxious to use with prost at
113+
/// the moment, and this option is really meant for human inspection.
114+
#[clap(long, default_value_t = false)]
115+
debug: bool,
108116
}
109117

110118
/// The relative likelihood of each action type being generated as floats which must sum to exactly
@@ -209,6 +217,7 @@ fn example(args: ExampleCmd) -> Result<(), Error> {
209217
.into()])],
210218
concurrent: false,
211219
wait_at_end: Some(Duration::from_secs(1).try_into().unwrap()),
220+
wait_for_current_run_to_finish_at_end: false,
212221
},
213222
ClientActionSet {
214223
actions: vec![mk_client_signal_action([
@@ -262,6 +271,8 @@ fn generate(args: GenerateCmd) -> Result<(), Error> {
262271
let context = ArbContext {
263272
config,
264273
cur_workflow_state: Default::default(),
274+
did_a_nested_action: false,
275+
action_set_nest_level: 0,
265276
};
266277
ARB_CONTEXT.set(context);
267278

@@ -281,41 +292,44 @@ thread_local! {
281292
}
282293

283294
static WF_STATE_FIELD_VALUE: &str = "x";
295+
static WF_TYPE_NAME: &str = "kitchenSink";
284296

285297
#[derive(Default)]
286298
struct ArbContext {
287299
config: GeneratorConfig,
288300
cur_workflow_state: WorkflowState,
301+
did_a_nested_action: bool,
302+
action_set_nest_level: usize,
289303
}
290304

291305
impl<'a> Arbitrary<'a> for TestInput {
292306
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
307+
// We always want a client sequence
308+
let mut client_sequence: ClientSequence = u.arbitrary()?;
293309
let mut ti = Self {
294310
// Input may or may not be present
295311
workflow_input: u.arbitrary()?,
296-
// We always want a client sequence
297-
client_sequence: Some(u.arbitrary()?),
312+
client_sequence: None,
298313
};
299-
// TODO: There needs to be some kind of coordination during generation to ensure
300-
// we don't have multiple returns etc.
301-
let cs = ti.client_sequence.get_or_insert(Default::default());
302-
cs.action_sets.push(ClientActionSet {
314+
315+
// Finally, return at the end
316+
client_sequence.action_sets.push(ClientActionSet {
303317
actions: vec![mk_client_signal_action([ReturnResultAction {
304318
return_this: Some(Payload::default()),
305319
}
306320
.into()])],
307321
..Default::default()
308322
});
323+
ti.client_sequence = Some(client_sequence);
309324
Ok(ti)
310325
}
311326
}
312327

313328
impl<'a> Arbitrary<'a> for WorkflowInput {
314-
fn arbitrary(_: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
315-
// TODO impl
316-
Ok(Self {
317-
initial_actions: vec![],
318-
})
329+
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
330+
let num_actions = 1..=ARB_CONTEXT.with_borrow(|c| c.config.max_initial_actions);
331+
let initial_actions = vec_of_size(u, num_actions)?;
332+
Ok(Self { initial_actions })
319333
}
320334
}
321335

@@ -329,24 +343,58 @@ impl<'a> Arbitrary<'a> for ClientSequence {
329343

330344
impl<'a> Arbitrary<'a> for ClientActionSet {
331345
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
332-
let num_actions = 1..=ARB_CONTEXT.with_borrow(|c| c.config.max_client_actions_per_set);
333-
Ok(Self {
334-
actions: vec_of_size(u, num_actions)?,
335-
concurrent: u.arbitrary()?,
336-
wait_at_end: u.arbitrary::<Option<ClientActionWait>>()?.map(Into::into),
337-
})
346+
let nest_level = ARB_CONTEXT.with_borrow_mut(|c| {
347+
c.action_set_nest_level += 1;
348+
c.action_set_nest_level
349+
});
350+
// Small chance of continuing as new
351+
let action_set = if nest_level == 1 && u.ratio(1, 100)? {
352+
let actions = vec![mk_client_signal_action([ContinueAsNewAction {
353+
workflow_type: WF_TYPE_NAME.to_string(),
354+
arguments: vec![to_proto_payload(
355+
WorkflowInput {
356+
initial_actions: vec![mk_action_set([action::Variant::SetWorkflowState(
357+
ARB_CONTEXT.with_borrow(|c| c.cur_workflow_state.clone()),
358+
)])],
359+
},
360+
"temporal.omes.kitchen_sink.WorkflowInput",
361+
)],
362+
..Default::default()
363+
}
364+
.into()])];
365+
Self {
366+
actions,
367+
wait_for_current_run_to_finish_at_end: true,
368+
..Default::default()
369+
}
370+
} else {
371+
let num_actions = 1..=ARB_CONTEXT.with_borrow(|c| c.config.max_client_actions_per_set);
372+
Self {
373+
actions: vec_of_size(u, num_actions)?,
374+
concurrent: u.arbitrary()?,
375+
wait_at_end: u.arbitrary::<Option<ClientActionWait>>()?.map(Into::into),
376+
wait_for_current_run_to_finish_at_end: false,
377+
}
378+
};
379+
ARB_CONTEXT.with_borrow_mut(|c| c.action_set_nest_level -= 1);
380+
Ok(action_set)
338381
}
339382
}
340383

341384
impl<'a> Arbitrary<'a> for ClientAction {
342385
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
343-
// TODO: Adjustable ratio of choice?
344-
let action_kind = u.int_in_range(0..=2)?;
386+
// Too much nesting can lead to very long action sets, which are also very confusing
387+
// to understand. One level of nesting ought to be sufficient for coverage.
388+
let action_kind = if ARB_CONTEXT.with_borrow(|c| c.action_set_nest_level == 1) {
389+
u.int_in_range(0..=3)?
390+
} else {
391+
u.int_in_range(0..=2)?
392+
};
345393
let variant = match action_kind {
346394
0 => client_action::Variant::DoSignal(u.arbitrary()?),
347395
1 => client_action::Variant::DoQuery(u.arbitrary()?),
348396
2 => client_action::Variant::DoUpdate(u.arbitrary()?),
349-
// TODO: Nested, if/when desired
397+
3 => client_action::Variant::NestedActions(u.arbitrary()?),
350398
_ => unreachable!(),
351399
};
352400
Ok(Self {
@@ -357,7 +405,6 @@ impl<'a> Arbitrary<'a> for ClientAction {
357405

358406
impl<'a> Arbitrary<'a> for DoSignal {
359407
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
360-
// TODO: Configurable?
361408
let variant = if u.ratio(95, 100)? {
362409
// 95% of the time do actions
363410
// Half of that in the handler half in main
@@ -383,7 +430,6 @@ impl<'a> Arbitrary<'a> for DoSignal {
383430
impl<'a> Arbitrary<'a> for DoQuery {
384431
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
385432
let mut failure_expected = false;
386-
// TODO: Configurable?
387433
let variant = if u.ratio(95, 100)? {
388434
// 95% of the time report state
389435
do_query::Variant::ReportState(u.arbitrary()?)
@@ -402,7 +448,6 @@ impl<'a> Arbitrary<'a> for DoQuery {
402448
impl<'a> Arbitrary<'a> for DoUpdate {
403449
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
404450
let mut failure_expected = false;
405-
// TODO: Configurable?
406451
let variant = if u.ratio(95, 100)? {
407452
// 95% of the time do actions
408453
do_update::Variant::DoActions(
@@ -436,7 +481,7 @@ impl<'a> Arbitrary<'a> for ActionSet {
436481

437482
impl<'a> Arbitrary<'a> for Action {
438483
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
439-
let action_kind = u.int_in_range(0..=100)? as f32;
484+
let action_kind = u.int_in_range(0..=1_000)? as f32 / 10.0;
440485
let chances = ARB_CONTEXT.with_borrow(|c| c.config.action_chances.clone());
441486
let variant = if chances.timer(action_kind) {
442487
action::Variant::Timer(u.arbitrary()?)
@@ -503,7 +548,6 @@ impl<'a> Arbitrary<'a> for TimerAction {
503548

504549
impl<'a> Arbitrary<'a> for ExecuteActivityAction {
505550
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
506-
// TODO: configurable ratio?
507551
let locality = if u.ratio(50, 100)? {
508552
execute_activity_action::Locality::Remote(u.arbitrary()?)
509553
} else {
@@ -547,7 +591,7 @@ impl<'a> Arbitrary<'a> for ExecuteChildWorkflowAction {
547591
let input = to_proto_payload(input, "temporal.omes.kitchen_sink.WorkflowInput");
548592
Ok(Self {
549593
// Use KS as own child, with an input to just return right away
550-
workflow_type: "kitchenSink".to_string(),
594+
workflow_type: WF_TYPE_NAME.to_string(),
551595
input: vec![input],
552596
awaitable_choice: Some(u.arbitrary()?),
553597
..Default::default()
@@ -620,15 +664,28 @@ impl<'a> Arbitrary<'a> for UpsertSearchAttributesAction {
620664
}
621665

622666
impl<'a> Arbitrary<'a> for RemoteActivityOptions {
623-
fn arbitrary(_: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
624-
// TODO: impl
625-
Ok(Self::default())
667+
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
668+
Ok(RemoteActivityOptions {
669+
cancellation_type: u.arbitrary::<ActivityCancellationType>()?.into(),
670+
do_not_eagerly_execute: false,
671+
versioning_intent: 0,
672+
})
673+
}
674+
}
675+
676+
impl<'a> Arbitrary<'a> for ActivityCancellationType {
677+
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
678+
let choices = [
679+
ActivityCancellationType::Abandon,
680+
ActivityCancellationType::TryCancel,
681+
ActivityCancellationType::WaitCancellationCompleted,
682+
];
683+
Ok(*u.choose(&choices)?)
626684
}
627685
}
628686

629687
impl<'a> Arbitrary<'a> for Payloads {
630688
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
631-
// TODO: configurable ratio?
632689
let payloads = if u.ratio(80, 100)? {
633690
vec![u.arbitrary()?]
634691
} else if u.ratio(50, 100)? {
@@ -699,15 +756,17 @@ fn vec_of_size<'a, T: Arbitrary<'a>>(
699756

700757
fn output_proto(generated_input: TestInput, output_kind: OutputConfig) -> Result<(), Error> {
701758
let mut buf = Vec::with_capacity(1024 * 10);
702-
generated_input.encode(&mut buf)?;
703-
if output_kind.use_stdout {
704-
std::io::stdout().write_all(&buf)?;
759+
if output_kind.debug {
760+
let as_str = format!("{:#?}", generated_input);
761+
buf.write_all(as_str.as_bytes())?;
705762
} else {
706-
let path = output_kind
707-
.output_path
708-
.expect("Output path must have been set");
763+
generated_input.encode(&mut buf)?;
764+
}
765+
if let Some(path) = output_kind.output_path {
709766
let mut file = std::fs::File::create(path)?;
710767
file.write_all(&buf)?;
768+
} else {
769+
std::io::stdout().write_all(&buf)?;
711770
}
712771
Ok(())
713772
}
@@ -733,7 +792,7 @@ fn mk_action_set(actions: impl IntoIterator<Item = action::Variant>) -> ActionSe
733792
variant: Some(variant),
734793
})
735794
.collect(),
736-
concurrent: true,
795+
concurrent: false,
737796
}
738797
}
739798

loadgen/kitchensink/helpers.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package kitchensink
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"go.temporal.io/api/common/v1"
78
"go.temporal.io/sdk/client"
9+
"go.temporal.io/sdk/workflow"
810
"golang.org/x/sync/errgroup"
911
"google.golang.org/protobuf/types/known/durationpb"
1012
"time"
@@ -81,6 +83,15 @@ func (e *ClientActionsExecutor) executeClientActionSet(ctx context.Context, acti
8183
}
8284
}
8385
}
86+
if actionSet.GetWaitForCurrentRunToFinishAtEnd() {
87+
err := e.Client.GetWorkflow(ctx, e.WorkflowID, e.RunID).
88+
GetWithOptions(ctx, nil, client.WorkflowRunGetOptions{DisableFollowingRuns: true})
89+
var canErr *workflow.ContinueAsNewError
90+
if err != nil && !errors.As(err, &canErr) {
91+
return err
92+
}
93+
e.RunID = e.Client.GetWorkflow(ctx, e.WorkflowID, "").GetRunID()
94+
}
8495
return nil
8596
}
8697

@@ -93,31 +104,35 @@ func (e *ClientActionsExecutor) executeClientAction(ctx context.Context, action
93104
var err error
94105
if sig := action.GetDoSignal(); sig != nil {
95106
if sigActions := sig.GetDoSignalActions(); sigActions != nil {
96-
err = e.Client.SignalWorkflow(ctx, e.WorkflowID, e.RunID, "do_actions_signal", sigActions)
107+
err = e.Client.SignalWorkflow(ctx, e.WorkflowID, "", "do_actions_signal", sigActions)
97108
} else if handler := sig.GetCustom(); handler != nil {
98-
err = e.Client.SignalWorkflow(ctx, e.WorkflowID, e.RunID, handler.Name, handler.Args)
109+
err = e.Client.SignalWorkflow(ctx, e.WorkflowID, "", handler.Name, handler.Args)
99110
} else {
100111
return fmt.Errorf("do_signal must recognizable variant")
101112
}
102113
return err
103114
} else if update := action.GetDoUpdate(); update != nil {
115+
var handle client.WorkflowUpdateHandle
104116
if actionsUpdate := update.GetDoActions(); actionsUpdate != nil {
105-
_, err = e.Client.UpdateWorkflow(ctx, e.WorkflowID, e.RunID, "do_actions_update", actionsUpdate)
117+
handle, err = e.Client.UpdateWorkflow(ctx, e.WorkflowID, "", "do_actions_update", actionsUpdate)
106118
} else if handler := update.GetCustom(); handler != nil {
107-
_, err = e.Client.UpdateWorkflow(ctx, e.WorkflowID, e.RunID, handler.Name, handler.Args)
119+
handle, err = e.Client.UpdateWorkflow(ctx, e.WorkflowID, "", handler.Name, handler.Args)
108120
} else {
109121
return fmt.Errorf("do_update must recognizable variant")
110122
}
123+
if err == nil {
124+
err = handle.Get(ctx, nil)
125+
}
111126
if update.FailureExpected {
112127
err = nil
113128
}
114129
return err
115130
} else if query := action.GetDoQuery(); query != nil {
116131
if query.GetReportState() != nil {
117132
// TODO: Use args
118-
_, err = e.Client.QueryWorkflow(ctx, e.WorkflowID, e.RunID, "report_state", nil)
133+
_, err = e.Client.QueryWorkflow(ctx, e.WorkflowID, "", "report_state", nil)
119134
} else if handler := query.GetCustom(); handler != nil {
120-
_, err = e.Client.QueryWorkflow(ctx, e.WorkflowID, e.RunID, handler.Name, handler.Args)
135+
_, err = e.Client.QueryWorkflow(ctx, e.WorkflowID, "", handler.Name, handler.Args)
121136
} else {
122137
return fmt.Errorf("do_query must recognizable variant")
123138
}

0 commit comments

Comments
 (0)