Skip to content

Commit ad26c4e

Browse files
committed
Updated code to be compatible with php-etl/pipeline-contracts:0.5 and php-etl/satellite-contracts:0.1
1 parent 7a6a1de commit ad26c4e

7 files changed

+129
-155
lines changed

composer.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
"php-etl/pipeline-contracts": "0.5.*",
2222
"php-etl/satellite-contracts": "0.1.*",
2323
"php-etl/pipeline": "*",
24-
"php-etl/console-state": "*",
2524
"php-etl/pipeline-console-runtime": "*",
2625
"php-etl/action-console-runtime": "*",
2726
"php-etl/action": "*",

composer.lock

Lines changed: 61 additions & 125 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/ActionProxy.php

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
use Kiboko\Component\Action\Action;
88
use Kiboko\Component\Runtime\Action\Console as ActionConsoleRuntime;
9-
use Kiboko\Component\State;
109
use Kiboko\Contract\Satellite\CodeInterface;
1110
use Kiboko\Contract\Satellite\RunnableInterface;
1211
use Symfony\Component\Console\Output\ConsoleOutput;
@@ -19,7 +18,7 @@ class ActionProxy implements RunnableInterface
1918
public function __construct(
2019
callable $factory,
2120
private readonly ConsoleOutput $output,
22-
private readonly State\StateOutput\Workflow $state,
21+
private readonly Workflow $state,
2322
private readonly CodeInterface $code,
2423
) {
2524
$this->queuedCalls[] = static function (ActionConsoleRuntime $runtime) use ($factory): void {

src/Console.php

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,24 @@
44

55
namespace Kiboko\Component\Runtime\Workflow;
66

7-
use Kiboko\Component\Action\Action;
8-
use Kiboko\Component\Pipeline\Pipeline;
97
use Kiboko\Component\Runtime\Pipeline\PipelineRuntimeInterface;
10-
use Kiboko\Component\State;
118
use Kiboko\Contract\Pipeline\PipelineRunnerInterface;
129
use Kiboko\Contract\Satellite\CodeInterface;
1310
use Kiboko\Contract\Satellite\RunnableInterface;
14-
use Kiboko\Contract\Satellite\RunnableInterface as JobRunnableInterface;
1511
use Symfony\Component\Console\Output\ConsoleOutput;
1612

1713
final class Console implements WorkflowRuntimeInterface
1814
{
19-
private readonly State\StateOutput\Workflow $state;
15+
private readonly Workflow $state;
2016

21-
/** @var list<JobRunnableInterface> */
17+
/** @var list<RunnableInterface> */
2218
private array $jobs = [];
2319

2420
public function __construct(
2521
private readonly ConsoleOutput $output,
2622
private readonly PipelineRunnerInterface $pipelineRunner,
2723
) {
28-
$this->state = new State\StateOutput\Workflow($output);
24+
$this->state = new Workflow($output);
2925
}
3026

3127
public function loadPipeline(CodeInterface $job, string $filename): PipelineRuntimeInterface
@@ -42,18 +38,18 @@ public function loadAction(CodeInterface $job, string $filename): RunnableInterf
4238
return new ActionProxy($factory, $this->output, $this->state, $job);
4339
}
4440

45-
public function job(JobRunnableInterface $job): self
41+
public function job(CodeInterface $job, RunnableInterface $runnable): self
4642
{
47-
$this->jobs[] = $job;
43+
$this->jobs[(string) $job] = [$job, $runnable];
4844

4945
return $this;
5046
}
5147

5248
public function run(int $interval = 1000): int
5349
{
5450
$count = 0;
55-
foreach ($this->jobs as $job) {
56-
$count = $job->run($interval);
51+
foreach ($this->jobs as [$job, $runnable]) {
52+
$count = $runnable->run($interval);
5753
}
5854

5955
return $count;

src/PipelineProxy.php

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,16 @@
66

77
use Kiboko\Component\Pipeline\Pipeline;
88
use Kiboko\Component\Runtime\Pipeline\Console as PipelineConsoleRuntime;
9+
use Kiboko\Component\Runtime\Pipeline\MemoryState;
910
use Kiboko\Component\Runtime\Pipeline\PipelineRuntimeInterface;
10-
use Kiboko\Component\State;
1111
use Kiboko\Contract\Pipeline\ExtractorInterface;
1212
use Kiboko\Contract\Pipeline\LoaderInterface;
1313
use Kiboko\Contract\Pipeline\PipelineRunnerInterface;
1414
use Kiboko\Contract\Pipeline\RejectionInterface;
1515
use Kiboko\Contract\Pipeline\StateInterface;
16+
use Kiboko\Contract\Pipeline\StepCodeInterface;
17+
use Kiboko\Contract\Pipeline\StepRejectionInterface;
18+
use Kiboko\Contract\Pipeline\StepStateInterface;
1619
use Kiboko\Contract\Pipeline\TransformerInterface;
1720
use Kiboko\Contract\Satellite\CodeInterface;
1821
use Symfony\Component\Console\Output\ConsoleOutput;
@@ -26,7 +29,7 @@ public function __construct(
2629
callable $factory,
2730
private readonly ConsoleOutput $output,
2831
private readonly PipelineRunnerInterface $pipelineRunner,
29-
private readonly State\StateOutput\Workflow $state,
32+
private readonly Workflow $state,
3033
private readonly CodeInterface $code,
3134
) {
3235
$this->queuedCalls[] = static function (PipelineConsoleRuntime $runtime) use ($factory): void {
@@ -35,36 +38,39 @@ public function __construct(
3538
}
3639

3740
public function extract(
41+
StepCodeInterface $step,
3842
ExtractorInterface $extractor,
39-
RejectionInterface $rejection,
40-
StateInterface $state,
43+
StepRejectionInterface $rejection,
44+
StepStateInterface $state,
4145
): self {
42-
$this->queuedCalls[] = static function (PipelineConsoleRuntime $runtime) use ($extractor, $rejection, $state): void {
43-
$runtime->extract($extractor, $rejection, $state);
46+
$this->queuedCalls[] = static function (PipelineConsoleRuntime $runtime) use ($step, $extractor, $rejection, $state): void {
47+
$runtime->extract($step, $extractor, $rejection, $state);
4448
};
4549

4650
return $this;
4751
}
4852

4953
public function transform(
54+
StepCodeInterface $step,
5055
TransformerInterface $transformer,
51-
RejectionInterface $rejection,
52-
StateInterface $state,
56+
StepRejectionInterface $rejection,
57+
StepStateInterface $state,
5358
): self {
54-
$this->queuedCalls[] = static function (PipelineConsoleRuntime $runtime) use ($transformer, $rejection, $state): void {
55-
$runtime->transform($transformer, $rejection, $state);
59+
$this->queuedCalls[] = static function (PipelineConsoleRuntime $runtime) use ($step, $transformer, $rejection, $state): void {
60+
$runtime->transform($step, $transformer, $rejection, $state);
5661
};
5762

5863
return $this;
5964
}
6065

6166
public function load(
67+
StepCodeInterface $step,
6268
LoaderInterface $loader,
63-
RejectionInterface $rejection,
64-
StateInterface $state,
69+
StepRejectionInterface $rejection,
70+
StepStateInterface $state,
6571
): self {
66-
$this->queuedCalls[] = static function (PipelineConsoleRuntime $runtime) use ($loader, $rejection, $state): void {
67-
$runtime->load($loader, $rejection, $state);
72+
$this->queuedCalls[] = static function (PipelineConsoleRuntime $runtime) use ($step, $loader, $rejection, $state): void {
73+
$runtime->load($step, $loader, $rejection, $state);
6874
};
6975

7076
return $this;
@@ -73,7 +79,7 @@ public function load(
7379
public function run(int $interval = 1000): int
7480
{
7581
$state = $this->state->withPipeline((string) $this->code);
76-
$pipeline = new Pipeline($this->pipelineRunner, new State\MemoryState());
82+
$pipeline = new Pipeline($this->pipelineRunner, new MemoryState());
7783

7884
$runtime = new PipelineConsoleRuntime($this->output, $pipeline, $state);
7985

src/Workflow.php

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Kiboko\Component\Runtime\Workflow;
6+
7+
use Kiboko\Component\Runtime\Action\Action;
8+
use Kiboko\Component\Runtime\Pipeline\Pipeline;
9+
use Symfony\Component\Console\Output\ConsoleOutput;
10+
11+
final class Workflow
12+
{
13+
/** @var list<Action|Pipeline> */
14+
private array $jobs = [];
15+
private string $index = 'A';
16+
17+
public function __construct(
18+
private readonly ConsoleOutput $output,
19+
) {
20+
}
21+
22+
public function withPipeline(string $label): Pipeline
23+
{
24+
return $this->jobs[] = new Pipeline($this->output, $this->index++, $label);
25+
}
26+
27+
public function withAction(string $label): Action
28+
{
29+
return $this->jobs[] = new Action($this->output, $this->index++, $label);
30+
}
31+
32+
public function update(): void
33+
{
34+
foreach ($this->jobs as $job) {
35+
$job->update();
36+
}
37+
}
38+
}

src/WorkflowRuntimeInterface.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
namespace Kiboko\Component\Runtime\Workflow;
66

7-
use Kiboko\Contract\Pipeline\SchedulingInterface;
87
use Kiboko\Contract\Satellite\RunnableInterface;
8+
use Kiboko\Contract\Satellite\SchedulingInterface;
99

1010
interface WorkflowRuntimeInterface extends SchedulingInterface, RunnableInterface {}

0 commit comments

Comments
 (0)