Skip to content

Commit 422dfcf

Browse files
authored
Merge pull request #47 from santisq/46-await-statements-should-be-set-to-configureawaitfalse-for-better-performance-and-to-avoid-deadlocks
Adds `ConfigureAwait(false)` to all `await` statements
2 parents c972bf5 + 91e7fc2 commit 422dfcf

File tree

8 files changed

+131
-112
lines changed

8 files changed

+131
-112
lines changed

src/PSParallelPipeline/Commands/InvokeParallelCommand.cs

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,12 @@ protected override void BeginProcessing()
5959
.AddFunctions(Functions, this)
6060
.AddVariables(Variables, this);
6161

62-
PoolSettings poolSettings = new()
63-
{
64-
MaxRunspaces = ThrottleLimit,
65-
UseNewRunspace = UseNewRunspace,
66-
InitialSessionState = iss
67-
};
62+
PoolSettings poolSettings = new(
63+
ThrottleLimit, UseNewRunspace, iss);
6864

69-
TaskSettings workerSettings = new()
70-
{
71-
Script = ScriptBlock.ToString(),
72-
UsingStatements = ScriptBlock.GetUsingParameters(this)
73-
};
65+
TaskSettings workerSettings = new(
66+
ScriptBlock.ToString(),
67+
ScriptBlock.GetUsingParameters(this));
7468

7569
_worker = new Worker(poolSettings, workerSettings, _cts.Token);
7670
_worker.Run();
@@ -96,7 +90,7 @@ protected override void ProcessRecord()
9690
}
9791
catch (OperationCanceledException exception)
9892
{
99-
_worker.Wait();
93+
_worker.WaitForCompletion();
10094
exception.WriteTimeoutError(this);
10195
}
10296
}
@@ -113,7 +107,7 @@ protected override void EndProcessing()
113107
ProcessOutput(data);
114108
}
115109

116-
_worker.Wait();
110+
_worker.WaitForCompletion();
117111
}
118112
catch (Exception _) when (_ is PipelineStoppedException or FlowControlException)
119113
{
@@ -122,7 +116,7 @@ protected override void EndProcessing()
122116
}
123117
catch (OperationCanceledException exception)
124118
{
125-
_worker.Wait();
119+
_worker.WaitForCompletion();
126120
exception.WriteTimeoutError(this);
127121
}
128122
}
@@ -131,33 +125,33 @@ private void ProcessOutput(PSOutputData data)
131125
{
132126
switch (data.Type)
133127
{
134-
case Type.Success:
128+
case OutputType.Success:
135129
WriteObject(data.Output);
136130
break;
137131

138-
case Type.Error:
132+
case OutputType.Error:
139133
WriteError((ErrorRecord)data.Output);
140134
break;
141135

142-
case Type.Debug:
136+
case OutputType.Debug:
143137
DebugRecord debug = (DebugRecord)data.Output;
144138
WriteDebug(debug.Message);
145139
break;
146140

147-
case Type.Information:
141+
case OutputType.Information:
148142
WriteInformation((InformationRecord)data.Output);
149143
break;
150144

151-
case Type.Progress:
145+
case OutputType.Progress:
152146
WriteProgress((ProgressRecord)data.Output);
153147
break;
154148

155-
case Type.Verbose:
149+
case OutputType.Verbose:
156150
VerboseRecord verbose = (VerboseRecord)data.Output;
157151
WriteVerbose(verbose.Message);
158152
break;
159153

160-
case Type.Warning:
154+
case OutputType.Warning:
161155
WarningRecord warning = (WarningRecord)data.Output;
162156
WriteWarning(warning.Message);
163157
break;
@@ -167,7 +161,7 @@ private void ProcessOutput(PSOutputData data)
167161
private void CancelAndWait()
168162
{
169163
_cts.Cancel();
170-
_worker?.Wait();
164+
_worker?.WaitForCompletion();
171165
}
172166

173167
protected override void StopProcessing() => CancelAndWait();

src/PSParallelPipeline/OutputType.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
namespace PSParallelPipeline;
2+
3+
internal enum OutputType
4+
{
5+
Success,
6+
Error,
7+
Debug,
8+
Information,
9+
Progress,
10+
Verbose,
11+
Warning
12+
}
Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,25 @@
11
namespace PSParallelPipeline;
22

3-
internal enum Type
4-
{
5-
Success,
6-
Error,
7-
Debug,
8-
Information,
9-
Progress,
10-
Verbose,
11-
Warning
12-
}
13-
14-
internal record struct PSOutputData(Type Type, object Output)
3+
internal record struct PSOutputData(OutputType Type, object Output)
154
{
165
internal static PSOutputData WriteObject(object sendToPipeline) =>
17-
new(Type.Success, sendToPipeline);
6+
new(OutputType.Success, sendToPipeline);
187

198
internal static PSOutputData WriteError(object error) =>
20-
new(Type.Error, error);
9+
new(OutputType.Error, error);
2110

2211
internal static PSOutputData WriteDebug(object debug) =>
23-
new(Type.Debug, debug);
12+
new(OutputType.Debug, debug);
2413

2514
internal static PSOutputData WriteInformation(object information) =>
26-
new(Type.Information, information);
15+
new(OutputType.Information, information);
2716

2817
internal static PSOutputData WriteProgress(object progress) =>
29-
new(Type.Progress, progress);
18+
new(OutputType.Progress, progress);
3019

3120
internal static PSOutputData WriteVerbose(object verbose) =>
32-
new(Type.Verbose, verbose);
21+
new(OutputType.Verbose, verbose);
3322

3423
internal static PSOutputData WriteWarning(object warning) =>
35-
new(Type.Warning, warning);
24+
new(OutputType.Warning, warning);
3625
}

src/PSParallelPipeline/PSTask.cs

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,42 +9,66 @@ namespace PSParallelPipeline;
99

1010
internal sealed class PSTask
1111
{
12+
private const string SetVariableCommand = "Set-Variable";
13+
14+
private const string DollarUnderbar = "_";
15+
16+
private const string StopParsingOp = "--%";
17+
1218
private readonly PowerShell _powershell;
1319

1420
private readonly PSDataStreams _internalStreams;
1521

16-
private readonly RunspacePool _pool;
22+
private Runspace? _runspace;
1723

18-
private PSOutputStreams OutputStreams { get => _pool.Streams; }
24+
private readonly PSOutputStreams _outputStreams;
1925

20-
private Runspace Runspace
21-
{
22-
get => _powershell.Runspace;
23-
set => _powershell.Runspace = value;
24-
}
26+
private readonly CancellationToken _token;
27+
28+
private readonly RunspacePool _pool;
2529

2630
private PSTask(RunspacePool pool)
2731
{
2832
_powershell = PowerShell.Create();
2933
_internalStreams = _powershell.Streams;
34+
_outputStreams = pool.Streams;
35+
_token = pool.Token;
3036
_pool = pool;
3137
}
3238

33-
static internal async Task<PSTask> CreateAsync(
39+
static internal PSTask Create(
3440
object? input,
3541
RunspacePool runspacePool,
3642
TaskSettings settings)
3743
{
3844
PSTask ps = new(runspacePool);
3945
SetStreams(ps._internalStreams, runspacePool.Streams);
40-
ps.Runspace = await runspacePool.GetRunspaceAsync();
4146

4247
return ps
4348
.AddInput(input)
4449
.AddScript(settings.Script)
4550
.AddUsingStatements(settings.UsingStatements);
4651
}
4752

53+
internal async Task InvokeAsync()
54+
{
55+
try
56+
{
57+
using CancellationTokenRegistration _ = _token.Register(Cancel);
58+
_runspace = await _pool.GetRunspaceAsync().ConfigureAwait(false);
59+
_powershell.Runspace = _runspace;
60+
await InvokePowerShellAsync(_powershell, _outputStreams.Success).ConfigureAwait(false);
61+
}
62+
catch (Exception exception)
63+
{
64+
_outputStreams.AddOutput(exception.CreateProcessingTaskError(this));
65+
}
66+
finally
67+
{
68+
CompleteTask();
69+
}
70+
}
71+
4872
private static void SetStreams(
4973
PSDataStreams streams,
5074
PSOutputStreams outputStreams)
@@ -69,8 +93,8 @@ private PSTask AddInput(object? inputObject)
6993
if (inputObject is not null)
7094
{
7195
_powershell
72-
.AddCommand("Set-Variable", useLocalScope: true)
73-
.AddArgument("_")
96+
.AddCommand(SetVariableCommand, useLocalScope: true)
97+
.AddArgument(DollarUnderbar)
7498
.AddArgument(inputObject);
7599
}
76100

@@ -87,33 +111,27 @@ private PSTask AddUsingStatements(Dictionary<string, object?> usingParams)
87111
{
88112
if (usingParams.Count > 0)
89113
{
90-
_powershell.AddParameter("--%", usingParams);
114+
_powershell.AddParameter(StopParsingOp, usingParams);
91115
}
92116

93117
return this;
94118
}
95119

96-
internal async Task InvokeAsync()
120+
private void CompleteTask()
97121
{
98-
try
99-
{
100-
using CancellationTokenRegistration _ = _pool.RegisterCancellation(Cancel);
101-
await InvokePowerShellAsync(_powershell, OutputStreams.Success);
102-
}
103-
catch (Exception exception)
104-
{
105-
OutputStreams.AddOutput(exception.CreateProcessingTaskError(this));
106-
}
107-
finally
122+
_powershell.Dispose();
123+
if (!_token.IsCancellationRequested && _runspace is not null)
108124
{
109-
_powershell.Dispose();
110-
_pool.PushRunspace(Runspace);
125+
_pool.PushRunspace(_runspace);
126+
return;
111127
}
128+
129+
_runspace?.Dispose();
112130
}
113131

114132
private void Cancel()
115133
{
116134
_powershell.Dispose();
117-
Runspace.Dispose();
135+
_runspace?.Dispose();
118136
}
119137
}

src/PSParallelPipeline/PoolSettings.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,14 @@
22

33
namespace PSParallelPipeline;
44

5-
internal record struct PoolSettings(
6-
int MaxRunspaces,
7-
bool UseNewRunspace,
8-
InitialSessionState InitialSessionState);
5+
internal class PoolSettings(
6+
int maxRunspaces,
7+
bool useNewRunspace,
8+
InitialSessionState initialSessionState)
9+
{
10+
internal int MaxRunspaces { get; } = maxRunspaces;
11+
12+
internal bool UseNewRunspace { get; } = useNewRunspace;
13+
14+
internal InitialSessionState InitialSessionState { get; } = initialSessionState;
15+
}

src/PSParallelPipeline/RunspacePool.cs

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,68 +8,61 @@ namespace PSParallelPipeline;
88

99
internal sealed class RunspacePool : IDisposable
1010
{
11-
private readonly CancellationToken _token;
11+
private readonly SemaphoreSlim _semaphore;
1212

13-
private readonly InitialSessionState _iss;
13+
private readonly PoolSettings _settings;
1414

1515
private readonly ConcurrentQueue<Runspace> _pool = [];
1616

17-
private readonly ConcurrentDictionary<Guid, Runspace> _created;
17+
private bool UseNewRunspace { get => _settings.UseNewRunspace; }
1818

19-
private readonly bool _useNew;
19+
internal int MaxRunspaces { get => _settings.MaxRunspaces; }
2020

21-
private readonly SemaphoreSlim _semaphore;
21+
internal CancellationToken Token { get; }
2222

2323
internal PSOutputStreams Streams { get; }
2424

25-
internal int MaxRunspaces { get; }
26-
2725
internal RunspacePool(
2826
PoolSettings settings,
2927
PSOutputStreams streams,
3028
CancellationToken token)
3129
{
32-
(MaxRunspaces, _useNew, _iss) = settings;
3330
Streams = streams;
34-
_token = token;
31+
Token = token;
32+
_settings = settings;
3533
_semaphore = new SemaphoreSlim(MaxRunspaces, MaxRunspaces);
36-
_created = new ConcurrentDictionary<Guid, Runspace>(
37-
Environment.ProcessorCount,
38-
MaxRunspaces);
3934
}
4035

4136
internal void PushRunspace(Runspace runspace)
4237
{
43-
if (_token.IsCancellationRequested)
38+
if (Token.IsCancellationRequested)
4439
{
4540
return;
4641
}
4742

48-
if (_useNew)
43+
if (UseNewRunspace)
4944
{
5045
runspace.Dispose();
51-
_created.TryRemove(runspace.InstanceId, out _);
5246
runspace = CreateRunspace();
5347
}
5448

5549
_pool.Enqueue(runspace);
5650
_semaphore.Release();
5751
}
5852

59-
internal CancellationTokenRegistration RegisterCancellation(Action callback) =>
60-
_token.Register(callback);
61-
6253
private Runspace CreateRunspace()
6354
{
64-
Runspace rs = RunspaceFactory.CreateRunspace(_iss);
65-
_created[rs.InstanceId] = rs;
55+
Runspace rs = RunspaceFactory.CreateRunspace(_settings.InitialSessionState);
6656
rs.Open();
6757
return rs;
6858
}
6959

7060
internal async Task<Runspace> GetRunspaceAsync()
7161
{
72-
await _semaphore.WaitAsync(_token);
62+
await _semaphore
63+
.WaitAsync(Token)
64+
.ConfigureAwait(false);
65+
7366
if (_pool.TryDequeue(out Runspace runspace))
7467
{
7568
return runspace;
@@ -80,7 +73,7 @@ internal async Task<Runspace> GetRunspaceAsync()
8073

8174
public void Dispose()
8275
{
83-
foreach (Runspace runspace in _created.Values)
76+
foreach (Runspace runspace in _pool)
8477
{
8578
runspace.Dispose();
8679
}

0 commit comments

Comments
 (0)