Skip to content

Commit be948a8

Browse files
authored
Merge pull request #27 from php-etl/feature/catch-logs
catch RejectedItemExceptions and log them instead of stopping the pipeline
2 parents 715b813 + 48e3d99 commit be948a8

File tree

1 file changed

+110
-31
lines changed

1 file changed

+110
-31
lines changed

src/Builder/Transformer.php

Lines changed: 110 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,18 @@
99
use PhpParser\Builder;
1010
use PhpParser\Node;
1111

12-
final readonly class Transformer implements StepBuilderInterface
12+
final class Transformer implements StepBuilderInterface
1313
{
14-
public function __construct(private Builder|Node $mapper)
14+
private ?Node\Expr $logger = null;
15+
16+
public function __construct(private readonly Builder|Node $mapper)
1517
{
1618
}
1719

1820
public function withLogger(Node\Expr $logger): self
1921
{
22+
$this->logger = $logger;
23+
2024
return $this;
2125
}
2226

@@ -54,6 +58,15 @@ class: new Node\Stmt\Class_(
5458
),
5559
flags: Node\Stmt\Class_::MODIFIER_PRIVATE
5660
),
61+
new Node\Param(
62+
var: new Node\Expr\Variable(
63+
name: 'logger'
64+
),
65+
type: new Node\Name\FullyQualified(
66+
name: \Psr\Log\LoggerInterface::class
67+
),
68+
flags: Node\Stmt\Class_::MODIFIER_PUBLIC,
69+
),
5770
],
5871
],
5972
),
@@ -64,45 +77,108 @@ class: new Node\Stmt\Class_(
6477
'stmts' => [
6578
new Node\Stmt\Expression(
6679
new Node\Expr\Assign(
67-
var: new Node\Expr\Variable('line'),
80+
var: new Node\Expr\Variable('input'),
6881
expr: new Node\Expr\Yield_(null)
6982
),
7083
),
71-
new Node\Stmt\Do_(
72-
cond: new Node\Expr\Assign(
73-
var: new Node\Expr\Variable('line'),
74-
expr: new Node\Expr\Yield_(
75-
new Node\Expr\New_(
76-
class: new Node\Name\FullyQualified(
77-
\Kiboko\Component\Bucket\AcceptanceResultBucket::class
78-
),
79-
args: [
80-
new Node\Arg(
81-
new Node\Expr\Variable('line'),
82-
),
83-
],
84-
)
84+
new Node\Stmt\While_(
85+
cond: new Node\Expr\BinaryOp\NotIdentical(
86+
left: new Node\Expr\Variable('input'),
87+
right: new Node\Expr\ConstFetch(
88+
new Node\Name('null')
8589
)
8690
),
8791
stmts: [
88-
new Node\Stmt\Expression(
89-
new Node\Expr\Assign(
90-
var: new Node\Expr\Variable('line'),
91-
expr: new Node\Expr\FuncCall(
92-
name: new Node\Expr\PropertyFetch(
93-
var: new Node\Expr\Variable('this'),
94-
name: new Node\Identifier('mapper'),
92+
new Node\Stmt\TryCatch(
93+
stmts: [
94+
new Node\Stmt\Expression(
95+
new Node\Expr\Assign(
96+
var: new Node\Expr\Variable('line'),
97+
expr: new Node\Expr\FuncCall(
98+
name: new Node\Expr\PropertyFetch(
99+
var: new Node\Expr\Variable('this'),
100+
name: new Node\Identifier('mapper'),
101+
),
102+
args: [
103+
new Node\Arg(
104+
value: new Node\Expr\Variable('input')
105+
),
106+
new Node\Arg(
107+
value: new Node\Expr\Variable('input')
108+
),
109+
]
110+
),
95111
),
96-
args: [
97-
new Node\Arg(
98-
value: new Node\Expr\Variable('line')
112+
),
113+
],
114+
catches: [
115+
new Node\Stmt\Catch_(
116+
types: [
117+
new Node\Name\FullyQualified(\Kiboko\Contract\Pipeline\RejectedItemException::class),
118+
],
119+
var: new Node\Expr\Variable('exception'),
120+
stmts: [
121+
new Node\Stmt\Expression(
122+
new Node\Expr\MethodCall(
123+
var: new Node\Expr\PropertyFetch(
124+
var: new Node\Expr\Variable('this'),
125+
name: new Node\Identifier('logger'),
126+
),
127+
name: new Node\Name('error'),
128+
args: [
129+
new Node\Arg(
130+
new Node\Expr\MethodCall(
131+
var: new Node\Expr\Variable('exception'),
132+
name: new Node\Identifier('getMessage')
133+
)
134+
),
135+
new Node\Expr\Array_([
136+
new Node\Expr\ArrayItem(
137+
value: new Node\Expr\Variable('input'),
138+
key: new Node\Scalar\String_('input')
139+
),
140+
]),
141+
]
142+
)
99143
),
100-
new Node\Arg(
101-
value: new Node\Expr\Variable('line')
144+
new Node\Stmt\Expression(
145+
new Node\Expr\Assign(
146+
var: new Node\Expr\Variable('input'),
147+
expr: new Node\Expr\Yield_(
148+
new Node\Expr\New_(
149+
class: new Node\Name\FullyQualified(
150+
\Kiboko\Component\Bucket\RejectionResultBucket::class
151+
),
152+
args: [
153+
new Node\Arg(
154+
new Node\Expr\Variable('input'),
155+
),
156+
],
157+
)
158+
)
159+
),
102160
),
161+
new Node\Stmt\Continue_(),
103162
]
104163
),
105-
),
164+
]
165+
),
166+
new Node\Stmt\Expression(
167+
new Node\Expr\Assign(
168+
var: new Node\Expr\Variable('input'),
169+
expr: new Node\Expr\Yield_(
170+
new Node\Expr\New_(
171+
class: new Node\Name\FullyQualified(
172+
\Kiboko\Component\Bucket\AcceptanceResultBucket::class
173+
),
174+
args: [
175+
new Node\Arg(
176+
new Node\Expr\Variable('line'),
177+
),
178+
],
179+
)
180+
)
181+
)
106182
),
107183
],
108184
),
@@ -114,7 +190,7 @@ class: new Node\Name\FullyQualified(
114190
),
115191
args: [
116192
new Node\Arg(
117-
new Node\Expr\Variable('line'),
193+
new Node\Expr\Variable('input'),
118194
),
119195
],
120196
),
@@ -131,6 +207,9 @@ class: new Node\Name\FullyQualified(
131207
new Node\Arg(
132208
$this->mapper instanceof Builder ? $this->mapper->getNode() : $this->mapper,
133209
),
210+
new Node\Arg(
211+
$this->logger ?? new Node\Expr\New_(new Node\Name\FullyQualified(\Psr\Log\NullLogger::class))
212+
),
134213
],
135214
);
136215
}

0 commit comments

Comments
 (0)