From 1fea6c1911148b71c9494ee8473cf150da8757e6 Mon Sep 17 00:00:00 2001 From: Andrey Golikov Date: Mon, 24 Nov 2025 16:54:34 +0300 Subject: [PATCH 1/3] update --- .idea/modules.xml | 8 ++++++++ .idea/outbox.iml | 9 +++++++++ .idea/workspace.xml | 4 ++++ db_context.go | 6 +++++- writer.go | 40 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 66 insertions(+), 1 deletion(-) create mode 100644 .idea/modules.xml create mode 100644 .idea/outbox.iml create mode 100644 .idea/workspace.xml diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..48fc8b0 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/outbox.iml b/.idea/outbox.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/outbox.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/workspace.xml b/.idea/workspace.xml new file mode 100644 index 0000000..1ac928b --- /dev/null +++ b/.idea/workspace.xml @@ -0,0 +1,4 @@ + + + {} + \ No newline at end of file diff --git a/db_context.go b/db_context.go index 66e0e6b..c59fb50 100644 --- a/db_context.go +++ b/db_context.go @@ -139,6 +139,10 @@ type txAdapter struct { tx *sql.Tx } +func NewTxAdapter(tx *sql.Tx) *txAdapter { + return &txAdapter{tx} +} + func (a *txAdapter) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { return a.tx.ExecContext(ctx, query, args...) } @@ -165,7 +169,7 @@ func (a *dbAdapter) BeginTx(ctx context.Context, opts *sql.TxOptions) (Tx, error if err != nil { return nil, err } - return &txAdapter{tx}, nil + return NewTxAdapter(tx), nil } func (a *dbAdapter) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { diff --git a/writer.go b/writer.go index 72250b3..f8831b5 100644 --- a/writer.go +++ b/writer.go @@ -112,6 +112,46 @@ func (w *Writer) Write(ctx context.Context, msg *Message, txWorkFunc TxWorkFunc) return nil } +func (w *Writer) WriteWithTX(ctx context.Context, tx Tx, msg *Message, txWorkFunc TxWorkFunc) error { + var txCommitted bool + defer func() { + if !txCommitted { + _ = tx.Rollback() + } + }() + + err := txWorkFunc(ctx, tx) + if err != nil { + return fmt.Errorf("failed to execute user-defined query: %w", err) + } + + query := fmt.Sprintf("INSERT INTO outbox (id, created_at, scheduled_at, metadata, payload, times_attempted) VALUES (%s, %s, %s, %s, %s, %s)", + w.dbCtx.getSQLPlaceholder(1), + w.dbCtx.getSQLPlaceholder(2), + w.dbCtx.getSQLPlaceholder(3), + w.dbCtx.getSQLPlaceholder(4), + w.dbCtx.getSQLPlaceholder(5), + w.dbCtx.getSQLPlaceholder(6)) + _, err = tx.ExecContext(ctx, query, w.dbCtx.formatMessageIDForDB(msg), msg.CreatedAt, msg.ScheduledAt, msg.Metadata, msg.Payload, 0) + if err != nil { + return fmt.Errorf("failed to store message in outbox: %w", err) + } + + err = tx.Commit() + txCommitted = err == nil + + if txCommitted && w.msgPublisher != nil { + ctxWithoutCancel := context.WithoutCancel(ctx) // optimistic path is async, so we don't want to cancel the context + go w.publishMessage(ctxWithoutCancel, msg) + } + + if err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + return nil +} + func (w *Writer) publishMessage(ctx context.Context, msg *Message) { ctx, cancel := context.WithTimeout(ctx, w.optimisticTimeout) defer cancel() From ddfe1b512b66d57efa29402192f124b55a3f2e04 Mon Sep 17 00:00:00 2001 From: Andrey Golikov Date: Mon, 24 Nov 2025 16:56:26 +0300 Subject: [PATCH 2/3] add WriteWithTX --- db_context.go | 6 +++++- writer.go | 12 +++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/db_context.go b/db_context.go index 66e0e6b..c59fb50 100644 --- a/db_context.go +++ b/db_context.go @@ -139,6 +139,10 @@ type txAdapter struct { tx *sql.Tx } +func NewTxAdapter(tx *sql.Tx) *txAdapter { + return &txAdapter{tx} +} + func (a *txAdapter) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { return a.tx.ExecContext(ctx, query, args...) } @@ -165,7 +169,7 @@ func (a *dbAdapter) BeginTx(ctx context.Context, opts *sql.TxOptions) (Tx, error if err != nil { return nil, err } - return &txAdapter{tx}, nil + return NewTxAdapter(tx), nil } func (a *dbAdapter) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { diff --git a/writer.go b/writer.go index 72250b3..cd0f21c 100644 --- a/writer.go +++ b/writer.go @@ -73,6 +73,16 @@ func (w *Writer) Write(ctx context.Context, msg *Message, txWorkFunc TxWorkFunc) return fmt.Errorf("failed to begin transaction: %w", err) } + var txCommitted bool + defer func() { + if !txCommitted { + _ = tx.Rollback() + } + }() + return w.WriteWithTX(ctx, tx, msg, txWorkFunc) +} + +func (w *Writer) WriteWithTX(ctx context.Context, tx Tx, msg *Message, txWorkFunc TxWorkFunc) error { var txCommitted bool defer func() { if !txCommitted { @@ -80,7 +90,7 @@ func (w *Writer) Write(ctx context.Context, msg *Message, txWorkFunc TxWorkFunc) } }() - err = txWorkFunc(ctx, tx) + err := txWorkFunc(ctx, tx) if err != nil { return fmt.Errorf("failed to execute user-defined query: %w", err) } From 99225b1bb8d01e3e5f1d858f48d7a801f60171c9 Mon Sep 17 00:00:00 2001 From: Andrey Golikov Date: Mon, 24 Nov 2025 19:53:17 +0300 Subject: [PATCH 3/3] add WriteWithTXV2 --- writer.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/writer.go b/writer.go index cd0f21c..5e098bd 100644 --- a/writer.go +++ b/writer.go @@ -20,6 +20,7 @@ type Writer struct { // that executes user-defined queries within the transaction that stores a message in the outbox. // The Writer itself commits or rolls back the transaction once the callback and the outbox insert complete. type TxWorkFunc func(ctx context.Context, txQueryer TxQueryer) error +type TxWorkFuncV2 func(ctx context.Context, txQueryer TxQueryer) (*Message, error) // WriterOption is a function that configures a Writer instance. type WriterOption func(*Writer) @@ -94,7 +95,26 @@ func (w *Writer) WriteWithTX(ctx context.Context, tx Tx, msg *Message, txWorkFun if err != nil { return fmt.Errorf("failed to execute user-defined query: %w", err) } + return w.write(ctx, tx, msg) +} + +func (w *Writer) WriteWithTXV2(ctx context.Context, tx Tx, txWorkFunc TxWorkFuncV2) error { + var txCommitted bool + defer func() { + if !txCommitted { + _ = tx.Rollback() + } + }() + msg, err := txWorkFunc(ctx, tx) + if err != nil { + return fmt.Errorf("failed to execute user-defined query: %w", err) + } + + return w.write(ctx, tx, msg) +} + +func (w *Writer) write(ctx context.Context, tx Tx, msg *Message) (err error) { query := fmt.Sprintf("INSERT INTO outbox (id, created_at, scheduled_at, metadata, payload, times_attempted) VALUES (%s, %s, %s, %s, %s, %s)", w.dbCtx.getSQLPlaceholder(1), w.dbCtx.getSQLPlaceholder(2), @@ -108,7 +128,7 @@ func (w *Writer) WriteWithTX(ctx context.Context, tx Tx, msg *Message, txWorkFun } err = tx.Commit() - txCommitted = err == nil + txCommitted := err == nil if txCommitted && w.msgPublisher != nil { ctxWithoutCancel := context.WithoutCancel(ctx) // optimistic path is async, so we don't want to cancel the context @@ -118,7 +138,6 @@ func (w *Writer) WriteWithTX(ctx context.Context, tx Tx, msg *Message, txWorkFun if err != nil { return fmt.Errorf("failed to commit transaction: %w", err) } - return nil }