Skip to content

Commit 340f4ae

Browse files
authored
perf(api): buffer asset_references writes via Redis to eliminate row-lock contention (#467)
Under high concurrency (50+ concurrent learn sessions), BatchIncrementAssetRefs causes PostgreSQL row-level lock contention on the asset_references table, resulting in slow queries (avg 316ms), S3 context canceled errors, and 504s. This replaces the synchronous DB upsert in StoreMessage with an async Redis-buffered writer that: - Uses HINCRBY to atomically accumulate ref counts in Redis (µs latency) - Flushes coalesced batches to DB every 1s via a background goroutine - Falls back to synchronous DB write if Redis enqueue fails - Restores counts to Redis on DB flush failure for automatic retry - Drains remaining data on graceful shutdown The writer is stateless — any pod can flush any project's pending data, and pod crash/restart loses at most 1s of ref_count increments (which are only used for storage analytics and GC, not hard constraints).
1 parent 8720f45 commit 340f4ae

7 files changed

Lines changed: 721 additions & 24 deletions

File tree

src/server/api/go/cmd/server/main.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/gin-gonic/gin"
2525
"github.com/memodb-io/Acontext/internal/bootstrap"
2626
"github.com/memodb-io/Acontext/internal/config"
27+
"github.com/memodb-io/Acontext/internal/infra/assetrefwriter"
2728
"github.com/memodb-io/Acontext/internal/infra/cache"
2829
dbpkg "github.com/memodb-io/Acontext/internal/infra/db"
2930
"github.com/memodb-io/Acontext/internal/modules/handler"
@@ -138,5 +139,13 @@ func main() {
138139
if err := srv.Shutdown(ctx); err != nil {
139140
log.Sugar().Errorw("server shutdown", "err", err)
140141
}
142+
143+
// Flush buffered asset reference writes before exit
144+
if writer := do.MustInvoke[*assetrefwriter.AssetRefWriter](inj); writer != nil {
145+
if err := writer.Close(ctx); err != nil {
146+
log.Sugar().Errorw("asset ref writer shutdown", "err", err)
147+
}
148+
}
149+
141150
log.Sugar().Info("server exited")
142151
}

src/server/api/go/internal/bootstrap/container.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/memodb-io/Acontext/configs"
1111
"github.com/memodb-io/Acontext/internal/config"
12+
"github.com/memodb-io/Acontext/internal/infra/assetrefwriter"
1213
"github.com/memodb-io/Acontext/internal/infra/blob"
1314
"github.com/memodb-io/Acontext/internal/infra/cache"
1415
"github.com/memodb-io/Acontext/internal/infra/db"
@@ -215,6 +216,27 @@ func BuildContainer() *do.Injector {
215216
do.MustInvoke[*blob.S3Deps](i),
216217
), nil
217218
})
219+
220+
// AssetRefWriter — buffers asset reference increments in Redis for async batch flush
221+
do.Provide(inj, func(i *do.Injector) (*assetrefwriter.AssetRefWriter, error) {
222+
cfg := do.MustInvoke[*config.Config](i)
223+
if !cfg.AssetRefWriter.Enabled {
224+
return nil, nil
225+
}
226+
interval := time.Duration(cfg.AssetRefWriter.FlushIntervalMs) * time.Millisecond
227+
if interval <= 0 {
228+
interval = time.Second
229+
}
230+
w := assetrefwriter.New(
231+
do.MustInvoke[*redis.Client](i),
232+
do.MustInvoke[repo.AssetReferenceRepo](i),
233+
do.MustInvoke[*zap.Logger](i),
234+
assetrefwriter.WithFlushInterval(interval),
235+
)
236+
w.Start()
237+
return w, nil
238+
})
239+
218240
do.Provide(inj, func(i *do.Injector) (repo.SessionRepo, error) {
219241
return repo.NewSessionRepo(
220242
do.MustInvoke[*gorm.DB](i),
@@ -268,6 +290,7 @@ func BuildContainer() *do.Injector {
268290
do.MustInvoke[repo.SessionRepo](i),
269291
do.MustInvoke[repo.SessionEventRepo](i),
270292
do.MustInvoke[repo.AssetReferenceRepo](i),
293+
do.MustInvoke[*assetrefwriter.AssetRefWriter](i),
271294
do.MustInvoke[*zap.Logger](i),
272295
do.MustInvoke[*blob.S3Deps](i),
273296
do.MustInvoke[*mq.Publisher](i),

src/server/api/go/internal/config/config.go

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -101,19 +101,25 @@ type ArtifactCfg struct {
101101
MaxUploadSizeBytes int64 // Maximum file upload size in bytes
102102
}
103103

104+
type AssetRefWriterCfg struct {
105+
Enabled bool // Enable async buffered writes for asset references (default true)
106+
FlushIntervalMs int // Flush interval in milliseconds (default 1000)
107+
}
108+
104109
type Config struct {
105-
App AppCfg
106-
Root RootCfg
107-
Log LogCfg
108-
Database DBCfg
109-
Redis RedisCfg
110-
RabbitMQ MQCfg
111-
S3 S3Cfg
112-
Core CoreCfg
113-
Metrics MetricsCfg
114-
Telemetry TelemetryCfg
115-
Supabase SupabaseCfg
116-
Artifact ArtifactCfg
110+
App AppCfg
111+
Root RootCfg
112+
Log LogCfg
113+
Database DBCfg
114+
Redis RedisCfg
115+
RabbitMQ MQCfg
116+
S3 S3Cfg
117+
Core CoreCfg
118+
Metrics MetricsCfg
119+
Telemetry TelemetryCfg
120+
Supabase SupabaseCfg
121+
Artifact ArtifactCfg
122+
AssetRefWriter AssetRefWriterCfg
117123
}
118124

119125
func setDefaults(v *viper.Viper) {
@@ -151,6 +157,8 @@ func setDefaults(v *viper.Viper) {
151157
v.SetDefault("supabase.apiKey", "")
152158
v.SetDefault("supabase.authURL", "")
153159
v.SetDefault("artifact.maxUploadSizeBytes", 16777216) // Default 16MB (16 * 1024 * 1024 bytes)
160+
v.SetDefault("assetRefWriter.enabled", true)
161+
v.SetDefault("assetRefWriter.flushIntervalMs", 1000)
154162
}
155163

156164
func Load() (*Config, error) {

0 commit comments

Comments
 (0)