Skip to content

Commit a4bc0e1

Browse files
authored
fix(api): handle expired asset metadata in AssetRefWriter (#469)
* fix(api): handle expired asset metadata in AssetRefWriter to prevent infinite retry loops The flushProject function treated all Redis errors identically, restoring pending counts for retry. When metadata keys expired (redis.Nil), this created an infinite error loop since the metadata would never reappear. Two fixes: 1. Enqueue now refreshes the meta key TTL on every call, preventing premature expiration while pending counts are still accumulating. 2. flushProject distinguishes redis.Nil (drop orphaned entry) from transient errors (restore for retry). * fix(ci): prevent E2E test hanging by adding teardown and flush context The E2E workflow hangs after tests pass because docker compose waits for Postgres to stop, but dangling connections prevent clean shutdown. Add a docker compose down teardown step as safety net. Also pass context to AssetRefWriter.flush() so Redis operations respect timeouts instead of using context.Background().
1 parent 08a91cf commit a4bc0e1

3 files changed

Lines changed: 88 additions & 6 deletions

File tree

.github/workflows/e2e-test.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,9 @@ jobs:
4747
run: |
4848
cd src/server
4949
docker compose -f docker-compose.test.yml logs
50+
51+
- name: Teardown
52+
if: always()
53+
run: |
54+
cd src/server
55+
docker compose -f docker-compose.test.yml down --timeout 10

src/server/api/go/internal/infra/assetrefwriter/writer.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ func (w *AssetRefWriter) Enqueue(ctx context.Context, projectID uuid.UUID, asset
112112
continue
113113
}
114114
pipe.SetNX(ctx, metaKey, metaJSON, defaultKeyTTL)
115+
// Always refresh meta key TTL so it doesn't expire before the pending count is flushed
116+
pipe.Expire(ctx, metaKey, defaultKeyTTL)
115117
}
116118

117119
// Mark project as dirty
@@ -136,18 +138,20 @@ func (w *AssetRefWriter) loop() {
136138
for {
137139
select {
138140
case <-ticker.C:
139-
w.flush()
141+
ctx, cancel := context.WithTimeout(context.Background(), w.interval)
142+
w.flush(ctx)
143+
cancel()
140144
case <-w.stopCh:
141-
// Final flush before exit
142-
w.flush()
145+
// Final flush before exit — use background context;
146+
// Close() enforces the overall deadline via its own select.
147+
w.flush(context.Background())
143148
return
144149
}
145150
}
146151
}
147152

148153
// flush pops dirty projects and flushes each one.
149-
func (w *AssetRefWriter) flush() {
150-
ctx := context.Background()
154+
func (w *AssetRefWriter) flush(ctx context.Context) {
151155

152156
for {
153157
// SPOP one project at a time to avoid holding too many in memory
@@ -222,8 +226,15 @@ func (w *AssetRefWriter) flushProject(ctx context.Context, projectID uuid.UUID)
222226
metaKey := metaKeyPrefix + pid + ":" + e.sha256
223227
metaJSON, err := w.redis.Get(ctx, metaKey).Bytes()
224228
if err != nil {
229+
if err == redis.Nil {
230+
// Metadata expired or was never written — drop the orphaned pending entry
231+
// to avoid infinite retry loops. The next real Enqueue will recreate it.
232+
w.log.Warn("asset meta expired, dropping orphaned pending entry",
233+
zap.String("key", metaKey), zap.Int64("lost_count", e.count))
234+
continue
235+
}
225236
w.log.Error("get asset meta", zap.String("key", metaKey), zap.Error(err))
226-
// Restore count to Redis since we can't flush without metadata
237+
// Restore count to Redis since we can't flush without metadata (transient error)
227238
w.redis.HIncrBy(ctx, pendingKey, e.sha256, e.count)
228239
w.redis.SAdd(ctx, dirtySetKey, pid)
229240
continue

src/server/api/go/internal/infra/assetrefwriter/writer_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,71 @@ func TestFlush_MultipleProjects(t *testing.T) {
299299
assert.Equal(t, 2, countByProject[p2])
300300
}
301301

302+
func TestFlush_MissingMetadata_DropsOrphanedEntry(t *testing.T) {
303+
rdb := newTestRedis(t)
304+
ctx := context.Background()
305+
mockRepo := &mockAssetReferenceRepo{}
306+
log := zap.NewNop()
307+
308+
w := New(rdb, mockRepo, log, WithFlushInterval(time.Hour))
309+
w.Start()
310+
311+
projectID := uuid.New()
312+
pid := projectID.String()
313+
pendingKey := pendingKeyPrefix + pid
314+
315+
// Manually create a pending entry WITHOUT metadata (simulates expired meta key)
316+
rdb.HSet(ctx, pendingKey, "orphan_sha256", "3")
317+
rdb.SAdd(ctx, dirtySetKey, pid)
318+
319+
// Also enqueue a valid asset
320+
validAsset := model.Asset{SHA256: "valid_sha256", S3Key: "s3/valid", Bucket: "b", MIME: "application/json", SizeB: 100}
321+
require.NoError(t, w.Enqueue(ctx, projectID, []model.Asset{validAsset}))
322+
323+
// Close triggers flush
324+
w.Close(ctx)
325+
326+
// The valid asset should have been flushed to DB
327+
require.Len(t, mockRepo.calls, 1)
328+
assert.Equal(t, 1, len(mockRepo.calls[0].Assets))
329+
assert.Equal(t, "valid_sha256", mockRepo.calls[0].Assets[0].SHA256)
330+
331+
// The orphaned entry should NOT be restored to Redis
332+
count, err := rdb.HLen(ctx, pendingKey).Result()
333+
require.NoError(t, err)
334+
assert.Equal(t, int64(0), count, "orphaned pending entry should be dropped, not restored")
335+
}
336+
337+
func TestEnqueue_RefreshesMetaTTL(t *testing.T) {
338+
rdb := newTestRedis(t)
339+
ctx := context.Background()
340+
mockRepo := &mockAssetReferenceRepo{}
341+
log := zap.NewNop()
342+
343+
w := New(rdb, mockRepo, log, WithFlushInterval(time.Hour))
344+
w.Start()
345+
defer w.Close(ctx)
346+
347+
projectID := uuid.New()
348+
asset := model.Asset{SHA256: "aaa", S3Key: "s3/aaa", Bucket: "b"}
349+
350+
// First enqueue
351+
require.NoError(t, w.Enqueue(ctx, projectID, []model.Asset{asset}))
352+
353+
metaKey := metaKeyPrefix + projectID.String() + ":aaa"
354+
355+
// Reduce TTL artificially to simulate time passing
356+
rdb.Expire(ctx, metaKey, 1*time.Second)
357+
358+
// Second enqueue should refresh TTL
359+
require.NoError(t, w.Enqueue(ctx, projectID, []model.Asset{asset}))
360+
361+
ttl, err := rdb.TTL(ctx, metaKey).Result()
362+
require.NoError(t, err)
363+
// TTL should have been refreshed to ~24h, not the 1s we set
364+
assert.Greater(t, ttl, 1*time.Minute, "meta key TTL should be refreshed on re-enqueue")
365+
}
366+
302367
func TestMetadata_SETNXIdempotent(t *testing.T) {
303368
rdb := newTestRedis(t)
304369
ctx := context.Background()

0 commit comments

Comments
 (0)