Skip to content

Commit cb43b00

Browse files
authored
Merge pull request #2 from QuesmaOrg/wal
Enable WAL and busy_timeout for concurrent SQLite access
2 parents a5b2d8f + c9c47e1 commit cb43b00

2 files changed

Lines changed: 124 additions & 2 deletions

File tree

main.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,10 @@ func processInsertJob(job insertJob) {
8080

8181
func initDB(dbPath string) error {
8282
var err error
83-
db, err = sql.Open("sqlite", dbPath)
83+
// WAL lets readers and the writer proceed concurrently; busy_timeout
84+
// makes competing connections wait instead of failing with SQLITE_BUSY.
85+
dsn := dbPath + "?_pragma=journal_mode(WAL)&_pragma=busy_timeout(5000)&_pragma=synchronous(NORMAL)"
86+
db, err = sql.Open("sqlite", dsn)
8487
if err != nil {
8588
return err
8689
}
@@ -763,7 +766,9 @@ func runQuery(args []string) {
763766
query := fs.Arg(0)
764767

765768
var err error
766-
db, err = sql.Open("sqlite", *dbPath)
769+
// Read-only + busy_timeout so ad-hoc queries don't contend with a
770+
// running server's writes.
771+
db, err = sql.Open("sqlite", *dbPath+"?_pragma=busy_timeout(5000)&mode=ro")
767772
if err != nil {
768773
log.Fatalf("failed to open database: %v", err)
769774
}

main_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ import (
55
"context"
66
"database/sql"
77
"encoding/json"
8+
"fmt"
89
"net/http"
910
"net/http/httptest"
1011
"os"
12+
"sync"
1113
"testing"
1214
"time"
1315
)
@@ -240,3 +242,118 @@ func TestLogIngestion(t *testing.T) {
240242
t.Errorf("expected body 'Test log message', got '%s'", logBody)
241243
}
242244
}
245+
246+
// TestConcurrentReadWrite reproduces the pytest-vs-server race: a separate
247+
// DB handle reads in parallel with server-side inserts. Without WAL +
248+
// busy_timeout this fails with "database is locked".
249+
func TestConcurrentReadWrite(t *testing.T) {
250+
dbPath, _, cleanup := setupTestDB(t)
251+
defer cleanup()
252+
253+
reader, err := sql.Open("sqlite", dbPath+"?_pragma=busy_timeout(5000)&mode=ro")
254+
if err != nil {
255+
t.Fatalf("open reader: %v", err)
256+
}
257+
defer reader.Close()
258+
259+
makePayload := func(i int) []byte {
260+
p := map[string]interface{}{
261+
"resourceSpans": []interface{}{map[string]interface{}{
262+
"resource": map[string]interface{}{"attributes": []interface{}{
263+
map[string]interface{}{"key": "service.name", "value": map[string]interface{}{"stringValue": "race-svc"}},
264+
}},
265+
"scopeSpans": []interface{}{map[string]interface{}{
266+
"spans": []interface{}{map[string]interface{}{
267+
"traceId": fmt.Sprintf("%032x", i),
268+
"spanId": fmt.Sprintf("%016x", i),
269+
"name": "s",
270+
"kind": float64(1),
271+
"startTimeUnixNano": "1700000000000000000",
272+
"endTimeUnixNano": "1700000001000000000",
273+
"status": map[string]interface{}{"code": float64(0)},
274+
}},
275+
}},
276+
}},
277+
}
278+
b, _ := json.Marshal(p)
279+
return b
280+
}
281+
282+
const writers = 4
283+
const perWriter = 50
284+
stop := make(chan struct{})
285+
var wg sync.WaitGroup
286+
287+
// Writers: concurrent POSTs to the trace handler.
288+
for w := 0; w < writers; w++ {
289+
wg.Add(1)
290+
go func(w int) {
291+
defer wg.Done()
292+
for i := 0; i < perWriter; i++ {
293+
req := httptest.NewRequest(http.MethodPost, "/v1/traces",
294+
bytes.NewReader(makePayload(w*perWriter+i)))
295+
req.Header.Set("Content-Type", "application/json")
296+
rec := httptest.NewRecorder()
297+
handleTraces(rec, req)
298+
if rec.Code != http.StatusOK {
299+
t.Errorf("write %d/%d: status %d", w, i, rec.Code)
300+
return
301+
}
302+
}
303+
}(w)
304+
}
305+
306+
// Reader: hammer COUNT(*) until writers stop.
307+
var readErr error
308+
var reads int
309+
wg.Add(1)
310+
go func() {
311+
defer wg.Done()
312+
for {
313+
select {
314+
case <-stop:
315+
return
316+
default:
317+
}
318+
var n int
319+
if err := reader.QueryRow("SELECT COUNT(*) FROM traces").Scan(&n); err != nil {
320+
readErr = err
321+
return
322+
}
323+
reads++
324+
}
325+
}()
326+
327+
// Poll until the insert worker has drained everything.
328+
expected := writers * perWriter
329+
deadline := time.Now().Add(10 * time.Second)
330+
for time.Now().Before(deadline) {
331+
var n int
332+
if err := reader.QueryRow("SELECT COUNT(*) FROM traces").Scan(&n); err != nil {
333+
t.Fatalf("reader query failed mid-test: %v", err)
334+
}
335+
if n >= expected {
336+
break
337+
}
338+
time.Sleep(20 * time.Millisecond)
339+
}
340+
341+
close(stop)
342+
wg.Wait()
343+
344+
if readErr != nil {
345+
t.Fatalf("reader got error (expected WAL+busy_timeout to prevent this): %v", readErr)
346+
}
347+
if reads == 0 {
348+
t.Fatal("reader never completed a query")
349+
}
350+
351+
var final int
352+
if err := reader.QueryRow("SELECT COUNT(*) FROM traces").Scan(&final); err != nil {
353+
t.Fatalf("final read: %v", err)
354+
}
355+
if final != expected {
356+
t.Errorf("expected %d rows, got %d", expected, final)
357+
}
358+
t.Logf("completed %d concurrent reads alongside %d writes", reads, expected)
359+
}

0 commit comments

Comments
 (0)