Skip to content
This repository was archived by the owner on Dec 16, 2025. It is now read-only.

Commit 9bbfc3c

Browse files
authored
support/db: Support concurrent queries in a transaction (#2024)
This commit adds `Synchronized` flag to `support/db.Session`. When set to `true` and `Session` runs a transaction all `Exec*` and `Select*` methods are protected by mutex what allows running them in multiple goroutines. This is an experimental feature (see below) and not a breaking change (default is `false`). Postgres protocol does not allow sending Exec query results if previously sent Query haven't been fully read. This issue manifested itself when a PR that's sending read and write queries in multiple goroutines was merged. More info: lib/pq#81 lib/pq#635 Known limitations: * It's possible that it will not be needed if we decide to remove concurrency from ingestion pipeline (see #1983). We are adding this to unblock development of new ingestion processors with a more readable code (currently all database processors are merged into one big processor that is hard to read and test). Splitting `DatabaseProcessor` will be done in a separate PR/PRs. * During Horizon Team meeting we agreed to create a new `SynchronizedSession` struct embedding `db.Session` inside that would not be placed in a shared `support` package. The problem is that `history.Q` embeds `db.Session` inside. Changing this to `db.SessionInterface` requires updating a lot of files that create `history.Q` objects, tests and mocks. Given that we may want to revert this change in the future, the change in this commit seems to be simpler.
1 parent 5a15114 commit 9bbfc3c

File tree

4 files changed

+83
-3
lines changed

4 files changed

+83
-3
lines changed

services/horizon/internal/expingest/main.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,12 @@ func NewSystem(config Config) (*System, error) {
137137
return nil, errors.Wrap(err, "error creating ledger backend")
138138
}
139139

140-
historyQ := &history.Q{config.HistorySession}
140+
// Make historySession synchronized so it can be used in the pipeline
141+
// (saving to DB in multiple goroutines at the same time).
142+
historySession := config.HistorySession.Clone()
143+
historySession.Synchronized = true
144+
145+
historyQ := &history.Q{historySession}
141146

142147
session := &ingest.LiveSession{
143148
Archive: archive,
@@ -157,7 +162,7 @@ func NewSystem(config Config) (*System, error) {
157162

158163
system := &System{
159164
session: session,
160-
historySession: config.HistorySession,
165+
historySession: historySession,
161166
historyQ: historyQ,
162167
graph: config.OrderBookGraph,
163168
retry: alwaysRetry{time.Second},

support/db/main.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ package db
1414
import (
1515
"context"
1616
"database/sql"
17+
"sync"
1718

1819
"github.com/Masterminds/squirrel"
1920
"github.com/jmoiron/sqlx"
@@ -117,7 +118,16 @@ type Session struct {
117118
// Ctx is the context in which the repo is operating under.
118119
Ctx context.Context
119120

120-
tx *sqlx.Tx
121+
// Synchronized is an EXPERIMENTAL flag that allows sending queries
122+
// concurrently in a DB tx. When set to `true` all Exec and Select methods
123+
// sent in a transaction are protected by mutex. It is not needed outside
124+
// transaction context because then all queries are sent in a separate DB
125+
// connections.
126+
// Please note that Begin, Commit and Rollback must not be run in parallel.
127+
// Also, Query methods are not available when in Synchronized transaction.
128+
Synchronized bool
129+
txMutex sync.Mutex
130+
tx *sqlx.Tx
121131
}
122132

123133
type SessionInterface interface {

support/db/session.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ func (s *Session) Clone() *Session {
6767
return &Session{
6868
DB: s.DB,
6969
Ctx: s.Ctx,
70+
71+
Synchronized: s.Synchronized,
7072
}
7173
}
7274

@@ -128,6 +130,11 @@ func (s *Session) GetRaw(dest interface{}, query string, args ...interface{}) er
128130
return errors.Wrap(err, "replace placeholders failed")
129131
}
130132

133+
if s.Synchronized && s.tx != nil {
134+
s.txMutex.Lock()
135+
defer s.txMutex.Unlock()
136+
}
137+
131138
start := time.Now()
132139
err = s.conn().GetContext(s.Ctx, dest, query, args...)
133140
s.log("get", start, query, args)
@@ -197,6 +204,11 @@ func (s *Session) ExecRaw(query string, args ...interface{}) (sql.Result, error)
197204
return nil, errors.Wrap(err, "replace placeholders failed")
198205
}
199206

207+
if s.Synchronized && s.tx != nil {
208+
s.txMutex.Lock()
209+
defer s.txMutex.Unlock()
210+
}
211+
200212
start := time.Now()
201213
result, err := s.conn().ExecContext(s.Ctx, query, args...)
202214
s.log("exec", start, query, args)
@@ -238,6 +250,10 @@ func (s *Session) Query(query sq.Sqlizer) (*sqlx.Rows, error) {
238250

239251
// QueryRaw runs `query` with `args`
240252
func (s *Session) QueryRaw(query string, args ...interface{}) (*sqlx.Rows, error) {
253+
if s.Synchronized && s.tx != nil {
254+
return nil, errors.New("QueryRaw cannot be run in transaction when Synchronized is true")
255+
}
256+
241257
query, err := s.ReplacePlaceholders(query)
242258
if err != nil {
243259
return nil, errors.Wrap(err, "replace placeholders failed")
@@ -307,6 +323,11 @@ func (s *Session) SelectRaw(
307323
return errors.Wrap(err, "replace placeholders failed")
308324
}
309325

326+
if s.Synchronized && s.tx != nil {
327+
s.txMutex.Lock()
328+
defer s.txMutex.Unlock()
329+
}
330+
310331
start := time.Now()
311332
err = s.conn().SelectContext(s.Ctx, dest, query, args...)
312333
s.log("select", start, query, args)

support/db/session_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,57 @@ package db
22

33
import (
44
"context"
5+
"fmt"
6+
"sync"
57
"testing"
68

79
"github.com/stellar/go/support/db/dbtest"
810
"github.com/stretchr/testify/assert"
911
"github.com/stretchr/testify/require"
1012
)
1113

14+
func TestConcurrentQueriesTransaction(t *testing.T) {
15+
db := dbtest.Postgres(t).Load(testSchema)
16+
defer db.Close()
17+
18+
sess := &Session{
19+
Ctx: context.Background(),
20+
DB: db.Open(),
21+
// This test would fail for `Synchronized: false`
22+
Synchronized: true,
23+
}
24+
defer sess.DB.Close()
25+
26+
err := sess.Begin()
27+
assert.NoError(t, err)
28+
29+
var wg sync.WaitGroup
30+
for i := 0; i < 1000; i++ {
31+
wg.Add(1)
32+
go func(i int) {
33+
istr := fmt.Sprintf("%d", i)
34+
var err2 error
35+
if i%3 == 0 {
36+
var names []string
37+
err2 = sess.SelectRaw(&names, "SELECT name FROM people")
38+
} else if i%3 == 1 {
39+
var name string
40+
err2 = sess.GetRaw(&name, "SELECT name FROM people LIMIT 1")
41+
} else {
42+
_, err2 = sess.ExecRaw(
43+
"INSERT INTO people (name, hunger_level) VALUES ('bartek" + istr + "', " + istr + ")",
44+
)
45+
}
46+
assert.NoError(t, err2)
47+
wg.Done()
48+
}(i)
49+
}
50+
51+
wg.Wait()
52+
err = sess.Rollback()
53+
assert.NoError(t, err)
54+
}
55+
1256
func TestSession(t *testing.T) {
1357
db := dbtest.Postgres(t).Load(testSchema)
1458
defer db.Close()

0 commit comments

Comments
 (0)