Skip to content

Commit 746c65a

Browse files
committed
stress test http batch
1 parent 477c7b6 commit 746c65a

File tree

1 file changed

+113
-0
lines changed

1 file changed

+113
-0
lines changed
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package issues
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
"fmt"
7+
"github.com/ClickHouse/clickhouse-go/v2"
8+
"github.com/ClickHouse/clickhouse-go/v2/tests"
9+
"github.com/stretchr/testify/require"
10+
"strconv"
11+
"strings"
12+
"sync"
13+
"sync/atomic"
14+
"testing"
15+
)
16+
17+
func TestStressHTTPBatchConcurrency(t *testing.T) {
18+
t.Skip("intense test for local debugging")
19+
20+
env, err := tests.GetTestEnvironment("issues")
21+
require.NoError(t, err)
22+
useSSL, err := strconv.ParseBool(tests.GetEnv("CLICKHOUSE_USE_SSL", "false"))
23+
require.NoError(t, err)
24+
port := env.HttpPort
25+
var tlsConfig *tls.Config
26+
if useSSL {
27+
tlsConfig = &tls.Config{}
28+
port = env.HttpsPort
29+
}
30+
conn, err := tests.GetConnectionWithOptions(&clickhouse.Options{
31+
Protocol: clickhouse.HTTP,
32+
Addr: []string{fmt.Sprintf("%s:%d", env.Host, port)},
33+
Auth: clickhouse.Auth{
34+
Database: "default",
35+
Username: env.Username,
36+
Password: env.Password,
37+
},
38+
Debug: true,
39+
Debugf: func(format string, v ...any) {
40+
t.Logf(format, v...)
41+
},
42+
Compression: &clickhouse.Compression{
43+
Method: clickhouse.CompressionLZ4,
44+
},
45+
TLS: tlsConfig,
46+
MaxIdleConns: 10,
47+
MaxOpenConns: 10, // intense concurrency
48+
})
49+
defer conn.Close()
50+
51+
err = conn.Exec(context.Background(), `CREATE TABLE http_batch_issue (x String) ENGINE Memory`)
52+
require.NoError(t, err)
53+
defer func() {
54+
err := conn.Exec(context.Background(), "DROP TABLE http_batch_issue")
55+
if err != nil {
56+
t.Log("failed drop table", err)
57+
}
58+
}()
59+
60+
var totalCount atomic.Int64
61+
62+
doBatchInsert := func(id, batchID int, conn clickhouse.Conn) error {
63+
ctx := context.Background()
64+
t.Log(id, "PreparingBatch")
65+
batch, err := conn.PrepareBatch(ctx, "INSERT INTO http_batch_issue")
66+
if err != nil {
67+
return fmt.Errorf("prepare: %w", err)
68+
}
69+
defer func() {
70+
err := batch.Close()
71+
if err != nil {
72+
t.Log("failed batch close", err)
73+
}
74+
}()
75+
76+
for i := 0; i < 5_000; i++ {
77+
totalCount.Add(1)
78+
str := strings.Repeat("longlongstring", 100)
79+
err := batch.Append(fmt.Sprintf("%s=%d;", str, i))
80+
if err != nil {
81+
return fmt.Errorf("append: %w", err)
82+
}
83+
}
84+
85+
count := batch.Rows()
86+
err = batch.Send()
87+
if err != nil {
88+
return fmt.Errorf("send: %w", err)
89+
}
90+
91+
t.Logf("[%d] inserted %d for batch %d", id, count, batchID)
92+
93+
return nil
94+
}
95+
96+
var wg sync.WaitGroup
97+
for i := 0; i < 10; i++ {
98+
wg.Add(1)
99+
go func() {
100+
defer wg.Done()
101+
for j := 0; j < 100; j++ {
102+
err := doBatchInsert(i, j, conn)
103+
if err != nil {
104+
t.Fatal("index", i, "failed insert for batch", j, err)
105+
}
106+
}
107+
}()
108+
}
109+
110+
wg.Wait()
111+
112+
t.Log("Total rows:", totalCount.Load())
113+
}

0 commit comments

Comments
 (0)