Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 16 additions & 22 deletions src/PSParallelPipeline/Commands/InvokeParallelCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,12 @@
.AddFunctions(Functions, this)
.AddVariables(Variables, this);

PoolSettings poolSettings = new()
{
MaxRunspaces = ThrottleLimit,
UseNewRunspace = UseNewRunspace,
InitialSessionState = iss
};
PoolSettings poolSettings = new(
ThrottleLimit, UseNewRunspace, iss);

TaskSettings workerSettings = new()
{
Script = ScriptBlock.ToString(),
UsingStatements = ScriptBlock.GetUsingParameters(this)
};
TaskSettings workerSettings = new(
ScriptBlock.ToString(),
ScriptBlock.GetUsingParameters(this));

_worker = new Worker(poolSettings, workerSettings, _cts.Token);
_worker.Run();
Expand All @@ -96,7 +90,7 @@
}
catch (OperationCanceledException exception)
{
_worker.Wait();
_worker.WaitForCompletion();

Check warning on line 93 in src/PSParallelPipeline/Commands/InvokeParallelCommand.cs

View check run for this annotation

Codecov / codecov/patch

src/PSParallelPipeline/Commands/InvokeParallelCommand.cs#L93

Added line #L93 was not covered by tests
exception.WriteTimeoutError(this);
}
}
Expand All @@ -113,7 +107,7 @@
ProcessOutput(data);
}

_worker.Wait();
_worker.WaitForCompletion();
}
catch (Exception _) when (_ is PipelineStoppedException or FlowControlException)
{
Expand All @@ -122,7 +116,7 @@
}
catch (OperationCanceledException exception)
{
_worker.Wait();
_worker.WaitForCompletion();
exception.WriteTimeoutError(this);
}
}
Expand All @@ -131,33 +125,33 @@
{
switch (data.Type)
{
case Type.Success:
case OutputType.Success:
WriteObject(data.Output);
break;

case Type.Error:
case OutputType.Error:
WriteError((ErrorRecord)data.Output);
break;

case Type.Debug:
case OutputType.Debug:
DebugRecord debug = (DebugRecord)data.Output;
WriteDebug(debug.Message);
break;

case Type.Information:
case OutputType.Information:
WriteInformation((InformationRecord)data.Output);
break;

case Type.Progress:
case OutputType.Progress:
WriteProgress((ProgressRecord)data.Output);
break;

case Type.Verbose:
case OutputType.Verbose:
VerboseRecord verbose = (VerboseRecord)data.Output;
WriteVerbose(verbose.Message);
break;

case Type.Warning:
case OutputType.Warning:
WarningRecord warning = (WarningRecord)data.Output;
WriteWarning(warning.Message);
break;
Expand All @@ -167,7 +161,7 @@
private void CancelAndWait()
{
_cts.Cancel();
_worker?.Wait();
_worker?.WaitForCompletion();
}

protected override void StopProcessing() => CancelAndWait();
Expand Down
12 changes: 12 additions & 0 deletions src/PSParallelPipeline/OutputType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace PSParallelPipeline;

internal enum OutputType
{
Success,
Error,
Debug,
Information,
Progress,
Verbose,
Warning
}
27 changes: 8 additions & 19 deletions src/PSParallelPipeline/PSOutputData.cs
Original file line number Diff line number Diff line change
@@ -1,36 +1,25 @@
namespace PSParallelPipeline;

internal enum Type
{
Success,
Error,
Debug,
Information,
Progress,
Verbose,
Warning
}

internal record struct PSOutputData(Type Type, object Output)
internal record struct PSOutputData(OutputType Type, object Output)
{
internal static PSOutputData WriteObject(object sendToPipeline) =>
new(Type.Success, sendToPipeline);
new(OutputType.Success, sendToPipeline);

internal static PSOutputData WriteError(object error) =>
new(Type.Error, error);
new(OutputType.Error, error);

internal static PSOutputData WriteDebug(object debug) =>
new(Type.Debug, debug);
new(OutputType.Debug, debug);

internal static PSOutputData WriteInformation(object information) =>
new(Type.Information, information);
new(OutputType.Information, information);

internal static PSOutputData WriteProgress(object progress) =>
new(Type.Progress, progress);
new(OutputType.Progress, progress);

internal static PSOutputData WriteVerbose(object verbose) =>
new(Type.Verbose, verbose);
new(OutputType.Verbose, verbose);

internal static PSOutputData WriteWarning(object warning) =>
new(Type.Warning, warning);
new(OutputType.Warning, warning);
}
70 changes: 44 additions & 26 deletions src/PSParallelPipeline/PSTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,42 +9,66 @@ namespace PSParallelPipeline;

internal sealed class PSTask
{
private const string SetVariableCommand = "Set-Variable";

private const string DollarUnderbar = "_";

private const string StopParsingOp = "--%";

private readonly PowerShell _powershell;

private readonly PSDataStreams _internalStreams;

private readonly RunspacePool _pool;
private Runspace? _runspace;

private PSOutputStreams OutputStreams { get => _pool.Streams; }
private readonly PSOutputStreams _outputStreams;

private Runspace Runspace
{
get => _powershell.Runspace;
set => _powershell.Runspace = value;
}
private readonly CancellationToken _token;

private readonly RunspacePool _pool;

private PSTask(RunspacePool pool)
{
_powershell = PowerShell.Create();
_internalStreams = _powershell.Streams;
_outputStreams = pool.Streams;
_token = pool.Token;
_pool = pool;
}

static internal async Task<PSTask> CreateAsync(
static internal PSTask Create(
object? input,
RunspacePool runspacePool,
TaskSettings settings)
{
PSTask ps = new(runspacePool);
SetStreams(ps._internalStreams, runspacePool.Streams);
ps.Runspace = await runspacePool.GetRunspaceAsync();

return ps
.AddInput(input)
.AddScript(settings.Script)
.AddUsingStatements(settings.UsingStatements);
}

internal async Task InvokeAsync()
{
try
{
using CancellationTokenRegistration _ = _token.Register(Cancel);
_runspace = await _pool.GetRunspaceAsync().ConfigureAwait(false);
_powershell.Runspace = _runspace;
await InvokePowerShellAsync(_powershell, _outputStreams.Success).ConfigureAwait(false);
}
catch (Exception exception)
{
_outputStreams.AddOutput(exception.CreateProcessingTaskError(this));
}
finally
{
CompleteTask();
}
}

private static void SetStreams(
PSDataStreams streams,
PSOutputStreams outputStreams)
Expand All @@ -69,8 +93,8 @@ private PSTask AddInput(object? inputObject)
if (inputObject is not null)
{
_powershell
.AddCommand("Set-Variable", useLocalScope: true)
.AddArgument("_")
.AddCommand(SetVariableCommand, useLocalScope: true)
.AddArgument(DollarUnderbar)
.AddArgument(inputObject);
}

Expand All @@ -87,33 +111,27 @@ private PSTask AddUsingStatements(Dictionary<string, object?> usingParams)
{
if (usingParams.Count > 0)
{
_powershell.AddParameter("--%", usingParams);
_powershell.AddParameter(StopParsingOp, usingParams);
}

return this;
}

internal async Task InvokeAsync()
private void CompleteTask()
{
try
{
using CancellationTokenRegistration _ = _pool.RegisterCancellation(Cancel);
await InvokePowerShellAsync(_powershell, OutputStreams.Success);
}
catch (Exception exception)
{
OutputStreams.AddOutput(exception.CreateProcessingTaskError(this));
}
finally
_powershell.Dispose();
if (!_token.IsCancellationRequested && _runspace is not null)
{
_powershell.Dispose();
_pool.PushRunspace(Runspace);
_pool.PushRunspace(_runspace);
return;
}

_runspace?.Dispose();
}

private void Cancel()
{
_powershell.Dispose();
Runspace.Dispose();
_runspace?.Dispose();
}
}
15 changes: 11 additions & 4 deletions src/PSParallelPipeline/PoolSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@

namespace PSParallelPipeline;

internal record struct PoolSettings(
int MaxRunspaces,
bool UseNewRunspace,
InitialSessionState InitialSessionState);
internal class PoolSettings(
int maxRunspaces,
bool useNewRunspace,
InitialSessionState initialSessionState)
{
internal int MaxRunspaces { get; } = maxRunspaces;

internal bool UseNewRunspace { get; } = useNewRunspace;

internal InitialSessionState InitialSessionState { get; } = initialSessionState;
}
37 changes: 15 additions & 22 deletions src/PSParallelPipeline/RunspacePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,68 +8,61 @@ namespace PSParallelPipeline;

internal sealed class RunspacePool : IDisposable
{
private readonly CancellationToken _token;
private readonly SemaphoreSlim _semaphore;

private readonly InitialSessionState _iss;
private readonly PoolSettings _settings;

private readonly ConcurrentQueue<Runspace> _pool = [];

private readonly ConcurrentDictionary<Guid, Runspace> _created;
private bool UseNewRunspace { get => _settings.UseNewRunspace; }

private readonly bool _useNew;
internal int MaxRunspaces { get => _settings.MaxRunspaces; }

private readonly SemaphoreSlim _semaphore;
internal CancellationToken Token { get; }

internal PSOutputStreams Streams { get; }

internal int MaxRunspaces { get; }

internal RunspacePool(
PoolSettings settings,
PSOutputStreams streams,
CancellationToken token)
{
(MaxRunspaces, _useNew, _iss) = settings;
Streams = streams;
_token = token;
Token = token;
_settings = settings;
_semaphore = new SemaphoreSlim(MaxRunspaces, MaxRunspaces);
_created = new ConcurrentDictionary<Guid, Runspace>(
Environment.ProcessorCount,
MaxRunspaces);
}

internal void PushRunspace(Runspace runspace)
{
if (_token.IsCancellationRequested)
if (Token.IsCancellationRequested)
{
return;
}

if (_useNew)
if (UseNewRunspace)
{
runspace.Dispose();
_created.TryRemove(runspace.InstanceId, out _);
runspace = CreateRunspace();
}

_pool.Enqueue(runspace);
_semaphore.Release();
}

internal CancellationTokenRegistration RegisterCancellation(Action callback) =>
_token.Register(callback);

private Runspace CreateRunspace()
{
Runspace rs = RunspaceFactory.CreateRunspace(_iss);
_created[rs.InstanceId] = rs;
Runspace rs = RunspaceFactory.CreateRunspace(_settings.InitialSessionState);
rs.Open();
return rs;
}

internal async Task<Runspace> GetRunspaceAsync()
{
await _semaphore.WaitAsync(_token);
await _semaphore
.WaitAsync(Token)
.ConfigureAwait(false);

if (_pool.TryDequeue(out Runspace runspace))
{
return runspace;
Expand All @@ -80,7 +73,7 @@ internal async Task<Runspace> GetRunspaceAsync()

public void Dispose()
{
foreach (Runspace runspace in _created.Values)
foreach (Runspace runspace in _pool)
{
runspace.Dispose();
}
Expand Down
Loading