diff --git a/db_context.go b/db_context.go index 66e0e6b..c59fb50 100644 --- a/db_context.go +++ b/db_context.go @@ -139,6 +139,10 @@ type txAdapter struct { tx *sql.Tx } +func NewTxAdapter(tx *sql.Tx) *txAdapter { + return &txAdapter{tx} +} + func (a *txAdapter) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { return a.tx.ExecContext(ctx, query, args...) } @@ -165,7 +169,7 @@ func (a *dbAdapter) BeginTx(ctx context.Context, opts *sql.TxOptions) (Tx, error if err != nil { return nil, err } - return &txAdapter{tx}, nil + return NewTxAdapter(tx), nil } func (a *dbAdapter) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { diff --git a/writer.go b/writer.go index 72250b3..5e098bd 100644 --- a/writer.go +++ b/writer.go @@ -20,6 +20,7 @@ type Writer struct { // that executes user-defined queries within the transaction that stores a message in the outbox. // The Writer itself commits or rolls back the transaction once the callback and the outbox insert complete. type TxWorkFunc func(ctx context.Context, txQueryer TxQueryer) error +type TxWorkFuncV2 func(ctx context.Context, txQueryer TxQueryer) (*Message, error) // WriterOption is a function that configures a Writer instance. type WriterOption func(*Writer) @@ -73,6 +74,16 @@ func (w *Writer) Write(ctx context.Context, msg *Message, txWorkFunc TxWorkFunc) return fmt.Errorf("failed to begin transaction: %w", err) } + var txCommitted bool + defer func() { + if !txCommitted { + _ = tx.Rollback() + } + }() + return w.WriteWithTX(ctx, tx, msg, txWorkFunc) +} + +func (w *Writer) WriteWithTX(ctx context.Context, tx Tx, msg *Message, txWorkFunc TxWorkFunc) error { var txCommitted bool defer func() { if !txCommitted { @@ -80,11 +91,30 @@ func (w *Writer) Write(ctx context.Context, msg *Message, txWorkFunc TxWorkFunc) } }() - err = txWorkFunc(ctx, tx) + err := txWorkFunc(ctx, tx) if err != nil { return fmt.Errorf("failed to execute user-defined query: %w", err) } + return w.write(ctx, tx, msg) +} +func (w *Writer) WriteWithTXV2(ctx context.Context, tx Tx, txWorkFunc TxWorkFuncV2) error { + var txCommitted bool + defer func() { + if !txCommitted { + _ = tx.Rollback() + } + }() + + msg, err := txWorkFunc(ctx, tx) + if err != nil { + return fmt.Errorf("failed to execute user-defined query: %w", err) + } + + return w.write(ctx, tx, msg) +} + +func (w *Writer) write(ctx context.Context, tx Tx, msg *Message) (err error) { query := fmt.Sprintf("INSERT INTO outbox (id, created_at, scheduled_at, metadata, payload, times_attempted) VALUES (%s, %s, %s, %s, %s, %s)", w.dbCtx.getSQLPlaceholder(1), w.dbCtx.getSQLPlaceholder(2), @@ -98,7 +128,7 @@ func (w *Writer) Write(ctx context.Context, msg *Message, txWorkFunc TxWorkFunc) } err = tx.Commit() - txCommitted = err == nil + txCommitted := err == nil if txCommitted && w.msgPublisher != nil { ctxWithoutCancel := context.WithoutCancel(ctx) // optimistic path is async, so we don't want to cancel the context @@ -108,7 +138,6 @@ func (w *Writer) Write(ctx context.Context, msg *Message, txWorkFunc TxWorkFunc) if err != nil { return fmt.Errorf("failed to commit transaction: %w", err) } - return nil }