Skip to content

Commit 91e7fc2

Browse files
committed
many code improvements
1 parent d6e754a commit 91e7fc2

File tree

6 files changed

+92
-93
lines changed

6 files changed

+92
-93
lines changed

src/PSParallelPipeline/Commands/InvokeParallelCommand.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ protected override void ProcessRecord()
9090
}
9191
catch (OperationCanceledException exception)
9292
{
93-
_worker.Wait();
93+
_worker.WaitForCompletion();
9494
exception.WriteTimeoutError(this);
9595
}
9696
}
@@ -107,7 +107,7 @@ protected override void EndProcessing()
107107
ProcessOutput(data);
108108
}
109109

110-
_worker.Wait();
110+
_worker.WaitForCompletion();
111111
}
112112
catch (Exception _) when (_ is PipelineStoppedException or FlowControlException)
113113
{
@@ -116,7 +116,7 @@ protected override void EndProcessing()
116116
}
117117
catch (OperationCanceledException exception)
118118
{
119-
_worker.Wait();
119+
_worker.WaitForCompletion();
120120
exception.WriteTimeoutError(this);
121121
}
122122
}
@@ -125,33 +125,33 @@ private void ProcessOutput(PSOutputData data)
125125
{
126126
switch (data.Type)
127127
{
128-
case Type.Success:
128+
case OutputType.Success:
129129
WriteObject(data.Output);
130130
break;
131131

132-
case Type.Error:
132+
case OutputType.Error:
133133
WriteError((ErrorRecord)data.Output);
134134
break;
135135

136-
case Type.Debug:
136+
case OutputType.Debug:
137137
DebugRecord debug = (DebugRecord)data.Output;
138138
WriteDebug(debug.Message);
139139
break;
140140

141-
case Type.Information:
141+
case OutputType.Information:
142142
WriteInformation((InformationRecord)data.Output);
143143
break;
144144

145-
case Type.Progress:
145+
case OutputType.Progress:
146146
WriteProgress((ProgressRecord)data.Output);
147147
break;
148148

149-
case Type.Verbose:
149+
case OutputType.Verbose:
150150
VerboseRecord verbose = (VerboseRecord)data.Output;
151151
WriteVerbose(verbose.Message);
152152
break;
153153

154-
case Type.Warning:
154+
case OutputType.Warning:
155155
WarningRecord warning = (WarningRecord)data.Output;
156156
WriteWarning(warning.Message);
157157
break;
@@ -161,7 +161,7 @@ private void ProcessOutput(PSOutputData data)
161161
private void CancelAndWait()
162162
{
163163
_cts.Cancel();
164-
_worker?.Wait();
164+
_worker?.WaitForCompletion();
165165
}
166166

167167
protected override void StopProcessing() => CancelAndWait();
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
namespace PSParallelPipeline;
2+
3+
internal enum OutputType
4+
{
5+
Success,
6+
Error,
7+
Debug,
8+
Information,
9+
Progress,
10+
Verbose,
11+
Warning
12+
}
Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,25 @@
11
namespace PSParallelPipeline;
22

3-
internal enum Type
4-
{
5-
Success,
6-
Error,
7-
Debug,
8-
Information,
9-
Progress,
10-
Verbose,
11-
Warning
12-
}
13-
14-
internal record struct PSOutputData(Type Type, object Output)
3+
internal record struct PSOutputData(OutputType Type, object Output)
154
{
165
internal static PSOutputData WriteObject(object sendToPipeline) =>
17-
new(Type.Success, sendToPipeline);
6+
new(OutputType.Success, sendToPipeline);
187

198
internal static PSOutputData WriteError(object error) =>
20-
new(Type.Error, error);
9+
new(OutputType.Error, error);
2110

2211
internal static PSOutputData WriteDebug(object debug) =>
23-
new(Type.Debug, debug);
12+
new(OutputType.Debug, debug);
2413

2514
internal static PSOutputData WriteInformation(object information) =>
26-
new(Type.Information, information);
15+
new(OutputType.Information, information);
2716

2817
internal static PSOutputData WriteProgress(object progress) =>
29-
new(Type.Progress, progress);
18+
new(OutputType.Progress, progress);
3019

3120
internal static PSOutputData WriteVerbose(object verbose) =>
32-
new(Type.Verbose, verbose);
21+
new(OutputType.Verbose, verbose);
3322

3423
internal static PSOutputData WriteWarning(object warning) =>
35-
new(Type.Warning, warning);
24+
new(OutputType.Warning, warning);
3625
}

src/PSParallelPipeline/PSTask.cs

Lines changed: 44 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,44 +9,66 @@ namespace PSParallelPipeline;
99

1010
internal sealed class PSTask
1111
{
12+
private const string SetVariableCommand = "Set-Variable";
13+
14+
private const string DollarUnderbar = "_";
15+
16+
private const string StopParsingOp = "--%";
17+
1218
private readonly PowerShell _powershell;
1319

1420
private readonly PSDataStreams _internalStreams;
1521

16-
private readonly RunspacePool _pool;
22+
private Runspace? _runspace;
1723

18-
private PSOutputStreams OutputStreams { get => _pool.Streams; }
24+
private readonly PSOutputStreams _outputStreams;
1925

20-
private Runspace Runspace
21-
{
22-
get => _powershell.Runspace;
23-
set => _powershell.Runspace = value;
24-
}
26+
private readonly CancellationToken _token;
27+
28+
private readonly RunspacePool _pool;
2529

2630
private PSTask(RunspacePool pool)
2731
{
2832
_powershell = PowerShell.Create();
2933
_internalStreams = _powershell.Streams;
34+
_outputStreams = pool.Streams;
35+
_token = pool.Token;
3036
_pool = pool;
3137
}
3238

33-
static internal async Task<PSTask> CreateAsync(
39+
static internal PSTask Create(
3440
object? input,
3541
RunspacePool runspacePool,
3642
TaskSettings settings)
3743
{
3844
PSTask ps = new(runspacePool);
3945
SetStreams(ps._internalStreams, runspacePool.Streams);
40-
ps.Runspace = await runspacePool
41-
.GetRunspaceAsync()
42-
.ConfigureAwait(false);
4346

4447
return ps
4548
.AddInput(input)
4649
.AddScript(settings.Script)
4750
.AddUsingStatements(settings.UsingStatements);
4851
}
4952

53+
internal async Task InvokeAsync()
54+
{
55+
try
56+
{
57+
using CancellationTokenRegistration _ = _token.Register(Cancel);
58+
_runspace = await _pool.GetRunspaceAsync().ConfigureAwait(false);
59+
_powershell.Runspace = _runspace;
60+
await InvokePowerShellAsync(_powershell, _outputStreams.Success).ConfigureAwait(false);
61+
}
62+
catch (Exception exception)
63+
{
64+
_outputStreams.AddOutput(exception.CreateProcessingTaskError(this));
65+
}
66+
finally
67+
{
68+
CompleteTask();
69+
}
70+
}
71+
5072
private static void SetStreams(
5173
PSDataStreams streams,
5274
PSOutputStreams outputStreams)
@@ -71,8 +93,8 @@ private PSTask AddInput(object? inputObject)
7193
if (inputObject is not null)
7294
{
7395
_powershell
74-
.AddCommand("Set-Variable", useLocalScope: true)
75-
.AddArgument("_")
96+
.AddCommand(SetVariableCommand, useLocalScope: true)
97+
.AddArgument(DollarUnderbar)
7698
.AddArgument(inputObject);
7799
}
78100

@@ -89,33 +111,27 @@ private PSTask AddUsingStatements(Dictionary<string, object?> usingParams)
89111
{
90112
if (usingParams.Count > 0)
91113
{
92-
_powershell.AddParameter("--%", usingParams);
114+
_powershell.AddParameter(StopParsingOp, usingParams);
93115
}
94116

95117
return this;
96118
}
97119

98-
internal async Task InvokeAsync()
120+
private void CompleteTask()
99121
{
100-
try
101-
{
102-
using CancellationTokenRegistration _ = _pool.RegisterCancellation(Cancel);
103-
await InvokePowerShellAsync(_powershell, OutputStreams.Success).ConfigureAwait(false);
104-
}
105-
catch (Exception exception)
106-
{
107-
OutputStreams.AddOutput(exception.CreateProcessingTaskError(this));
108-
}
109-
finally
122+
_powershell.Dispose();
123+
if (!_token.IsCancellationRequested && _runspace is not null)
110124
{
111-
_powershell.Dispose();
112-
_pool.PushRunspace(Runspace);
125+
_pool.PushRunspace(_runspace);
126+
return;
113127
}
128+
129+
_runspace?.Dispose();
114130
}
115131

116132
private void Cancel()
117133
{
118134
_powershell.Dispose();
119-
Runspace.Dispose();
135+
_runspace?.Dispose();
120136
}
121137
}

src/PSParallelPipeline/RunspacePool.cs

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,71 +8,59 @@ namespace PSParallelPipeline;
88

99
internal sealed class RunspacePool : IDisposable
1010
{
11-
private readonly PoolSettings _settings;
12-
13-
private readonly CancellationToken _token;
11+
private readonly SemaphoreSlim _semaphore;
1412

15-
private InitialSessionState InitialSessionState { get => _settings.InitialSessionState; }
13+
private readonly PoolSettings _settings;
1614

1715
private readonly ConcurrentQueue<Runspace> _pool = [];
1816

19-
private readonly ConcurrentDictionary<Guid, Runspace> _created;
20-
2117
private bool UseNewRunspace { get => _settings.UseNewRunspace; }
2218

23-
private readonly SemaphoreSlim _semaphore;
19+
internal int MaxRunspaces { get => _settings.MaxRunspaces; }
2420

25-
internal PSOutputStreams Streams { get; }
21+
internal CancellationToken Token { get; }
2622

27-
internal int MaxRunspaces { get => _settings.MaxRunspaces; }
23+
internal PSOutputStreams Streams { get; }
2824

2925
internal RunspacePool(
3026
PoolSettings settings,
3127
PSOutputStreams streams,
3228
CancellationToken token)
3329
{
34-
_settings = settings;
3530
Streams = streams;
36-
_token = token;
31+
Token = token;
32+
_settings = settings;
3733
_semaphore = new SemaphoreSlim(MaxRunspaces, MaxRunspaces);
38-
_created = new ConcurrentDictionary<Guid, Runspace>(
39-
Environment.ProcessorCount,
40-
MaxRunspaces);
4134
}
4235

4336
internal void PushRunspace(Runspace runspace)
4437
{
45-
if (_token.IsCancellationRequested)
38+
if (Token.IsCancellationRequested)
4639
{
4740
return;
4841
}
4942

5043
if (UseNewRunspace)
5144
{
5245
runspace.Dispose();
53-
_created.TryRemove(runspace.InstanceId, out _);
5446
runspace = CreateRunspace();
5547
}
5648

5749
_pool.Enqueue(runspace);
5850
_semaphore.Release();
5951
}
6052

61-
internal CancellationTokenRegistration RegisterCancellation(Action callback) =>
62-
_token.Register(callback);
63-
6453
private Runspace CreateRunspace()
6554
{
66-
Runspace rs = RunspaceFactory.CreateRunspace(InitialSessionState);
67-
_created[rs.InstanceId] = rs;
55+
Runspace rs = RunspaceFactory.CreateRunspace(_settings.InitialSessionState);
6856
rs.Open();
6957
return rs;
7058
}
7159

7260
internal async Task<Runspace> GetRunspaceAsync()
7361
{
7462
await _semaphore
75-
.WaitAsync(_token)
63+
.WaitAsync(Token)
7664
.ConfigureAwait(false);
7765

7866
if (_pool.TryDequeue(out Runspace runspace))
@@ -85,7 +73,7 @@ await _semaphore
8573

8674
public void Dispose()
8775
{
88-
foreach (Runspace runspace in _created.Values)
76+
foreach (Runspace runspace in _pool)
8977
{
9078
runspace.Dispose();
9179
}

src/PSParallelPipeline/Worker.cs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ internal Worker(
3333
_pool = new RunspacePool(poolSettings, _streams, _token);
3434
}
3535

36-
internal void Wait() => _worker?.ConfigureAwait(false).GetAwaiter().GetResult();
36+
internal void WaitForCompletion() => _worker?.GetAwaiter().GetResult();
3737

3838
internal void Enqueue(object? input) => _input.Add(input, _token);
3939

@@ -58,17 +58,12 @@ private async Task Start()
5858
await ProcessAnyAsync(tasks).ConfigureAwait(false);
5959
}
6060

61-
PSTask task = await PSTask
62-
.CreateAsync(
63-
input: input,
64-
runspacePool: _pool,
65-
settings: _taskSettings)
66-
.ConfigureAwait(false);
67-
68-
tasks.Add(task.InvokeAsync());
61+
tasks.Add(PSTask
62+
.Create(input, _pool, _taskSettings)
63+
.InvokeAsync());
6964
}
7065
}
71-
catch
66+
catch (OperationCanceledException)
7267
{ }
7368
finally
7469
{
@@ -87,8 +82,7 @@ private static async Task ProcessAnyAsync(List<Task> tasks)
8782
.ConfigureAwait(false);
8883

8984
tasks.Remove(task);
90-
await task
91-
.ConfigureAwait(false);
85+
await task.ConfigureAwait(false);
9286
}
9387

9488
public void Dispose()

0 commit comments

Comments
 (0)