Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion db_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ type txAdapter struct {
tx *sql.Tx
}

func NewTxAdapter(tx *sql.Tx) *txAdapter {
return &txAdapter{tx}
}

func (a *txAdapter) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) {
return a.tx.ExecContext(ctx, query, args...)
}
Expand All @@ -165,7 +169,7 @@ func (a *dbAdapter) BeginTx(ctx context.Context, opts *sql.TxOptions) (Tx, error
if err != nil {
return nil, err
}
return &txAdapter{tx}, nil
return NewTxAdapter(tx), nil
}

func (a *dbAdapter) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) {
Expand Down
35 changes: 32 additions & 3 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Writer struct {
// that executes user-defined queries within the transaction that stores a message in the outbox.
// The Writer itself commits or rolls back the transaction once the callback and the outbox insert complete.
type TxWorkFunc func(ctx context.Context, txQueryer TxQueryer) error
type TxWorkFuncV2 func(ctx context.Context, txQueryer TxQueryer) (*Message, error)

// WriterOption is a function that configures a Writer instance.
type WriterOption func(*Writer)
Expand Down Expand Up @@ -73,18 +74,47 @@ func (w *Writer) Write(ctx context.Context, msg *Message, txWorkFunc TxWorkFunc)
return fmt.Errorf("failed to begin transaction: %w", err)
}

var txCommitted bool
defer func() {
if !txCommitted {
_ = tx.Rollback()
}
}()
return w.WriteWithTX(ctx, tx, msg, txWorkFunc)
}

func (w *Writer) WriteWithTX(ctx context.Context, tx Tx, msg *Message, txWorkFunc TxWorkFunc) error {
var txCommitted bool
defer func() {
if !txCommitted {
_ = tx.Rollback()
}
}()

err = txWorkFunc(ctx, tx)
err := txWorkFunc(ctx, tx)
if err != nil {
return fmt.Errorf("failed to execute user-defined query: %w", err)
}
return w.write(ctx, tx, msg)
}

func (w *Writer) WriteWithTXV2(ctx context.Context, tx Tx, txWorkFunc TxWorkFuncV2) error {
var txCommitted bool
defer func() {
if !txCommitted {
_ = tx.Rollback()
}
}()

msg, err := txWorkFunc(ctx, tx)
if err != nil {
return fmt.Errorf("failed to execute user-defined query: %w", err)
}

return w.write(ctx, tx, msg)
}

func (w *Writer) write(ctx context.Context, tx Tx, msg *Message) (err error) {
query := fmt.Sprintf("INSERT INTO outbox (id, created_at, scheduled_at, metadata, payload, times_attempted) VALUES (%s, %s, %s, %s, %s, %s)",
w.dbCtx.getSQLPlaceholder(1),
w.dbCtx.getSQLPlaceholder(2),
Expand All @@ -98,7 +128,7 @@ func (w *Writer) Write(ctx context.Context, msg *Message, txWorkFunc TxWorkFunc)
}

err = tx.Commit()
txCommitted = err == nil
txCommitted := err == nil

if txCommitted && w.msgPublisher != nil {
ctxWithoutCancel := context.WithoutCancel(ctx) // optimistic path is async, so we don't want to cancel the context
Expand All @@ -108,7 +138,6 @@ func (w *Writer) Write(ctx context.Context, msg *Message, txWorkFunc TxWorkFunc)
if err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

return nil
}

Expand Down