Skip to content

Commit dccfd18

Browse files
committed
bulker: stream consumer: clear table cache on errors
1 parent b059436 commit dccfd18

File tree

1 file changed

+3
-0
lines changed

1 file changed

+3
-0
lines changed

bulkerlib/implementations/sql/autocommit_stream.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ func (ps *AutoCommitStream) ConsumeMap(ctx context.Context, mp map[string]any) (
3939

4040
func (ps *AutoCommitStream) Consume(ctx context.Context, object types.Object) (state bulker.State, processedObject types.Object, err error) {
4141
defer func() {
42+
if err != nil {
43+
ps.sqlAdapter.TableHelper().ClearCached(ps.sqlAdapter.NamespaceName(ps.namespace), ps.tableName)
44+
}
4245
err = ps.postConsume(err)
4346
state = ps.state
4447
}()

0 commit comments

Comments
 (0)