Skip to content

Commit 8083a24

Browse files
committed
Add wakeup
1 parent 65bb782 commit 8083a24

File tree

6 files changed

+409
-4
lines changed

6 files changed

+409
-4
lines changed

last_plan.md

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ Loop_Mailbox :: struct($T: typeid) {
181181

182182
---
183183

184-
### Stage 4 — wakeup/ package (WakeUper interface + sema impl) ← NEXT
184+
### Stage 4 — wakeup/ package (WakeUper interface + sema impl) ✓ DONE (Session 74)
185185

186186
**Claim**: Separate the wakeup mechanism from the queue.
187187

@@ -228,7 +228,7 @@ init_loop_mailbox :: proc(m: ^Loop_Mailbox($T), w: wakeup.WakeUper) -> Loop_Mail
228228

229229
---
230230

231-
### Stage 5 — Pool WakeUper
231+
### Stage 5 — Pool WakeUper ← NEXT
232232

233233
**Purpose**:
234234
Add optional `wakeup.WakeUper` to the pool so event-loop callers can be notified
@@ -257,7 +257,7 @@ Pool :: struct($T: typeid) {
257257
**-vet workaround**: add `@(private) _PoolWaker :: wakeup.WakeUper` to `pool.odin`.
258258

259259
**Files changed**:
260-
- `pool/pool.odin` — add `waker`, \`empty_was_returned\`; update `init`/`get`/`put`/`destroy`
260+
- `pool/pool.odin` — add `waker`, `empty_was_returned`; update `init`/`get`/`put`/`destroy`
261261
- `pool_tests/pool_test.odin` — add tests: WakeUper wakes on put, waker.close on destroy
262262

263263
**Design doc**: `design/loop-mbox-enhancement.md` — Stage 5 section.
@@ -358,6 +358,25 @@ Details TBD when we reach this stage.
358358

359359
---
360360

361+
## Edge Cases & Missing Tests Strategy
362+
363+
To keep `queue_test.odin` and `wakeup_test.odin` clean as "runner of examples" and basic unit tests, we will create separate files for edge cases and stress tests.
364+
365+
### 1. MPSC Edge Cases (`mpsc/edge_test.odin`)
366+
- **Concurrent Push Stress**: 10 threads pushing 10,000 items each while 1 thread pops. Verify total count and no lost items.
367+
- **Stall State Handling**: A test that attempts to hit the Vyukov stall window (where `pop` returns `nil` but `len > 0`) under high contention.
368+
- **Stub Recycling Verification**: Test specifically targeting the logic path where the stub sentinel is recycled when one item remains.
369+
- **Length Consistency**: Verify `length()` accurately reflects atomic increments/decrements even if `pop` temporarily stalls.
370+
- **Missed Test**: `test_pop_all_drains_to_zero` — ensures that repeated pops from a multi-item queue eventually return `nil` and `length()` is 0.
371+
372+
### 2. WakeUper Edge Cases (`wakeup/edge_test.odin`)
373+
- **Concurrent Wake Signals**: 10 threads calling `wake()` at the same time on one `sema_wakeup`. Verify the semaphore count matches the number of calls.
374+
- **Wake after Close Protection**: Define and test the behavior when `wake()` is called after `close()`. (Recommendation: document as undefined/illegal, but provide a safe-fail if possible).
375+
- **Custom WakeUper**: Implement a simple dummy `WakeUper` in the test to verify the interface works without `sema_wakeup`.
376+
- **Missed Test**: `test_ctx_persistence` — check that the `rawptr` passed to `wake` and `close` is bit-for-bit identical to the one provided during creation.
377+
378+
---
379+
361380
## Critical files
362381

363382
| File | Stage | Action |
@@ -368,9 +387,11 @@ Details TBD when we reach this stage.
368387
| `mpsc/queue.odin` | 3 | NEW |
369388
| `mpsc/doc.odin` | 3 | NEW |
370389
| `mpsc/queue_test.odin` | 3 | NEW (unit tests) |
371-
| `wakeup/wakeup.odin` | 4 | NEW |
390+
| `mpsc/edge_test.odin` | 3 | NEW (edge + stress tests) |
391+
| `wakeup/wakeup.odin` | 4 | NEW + nil-check guards in _sema_wake/_sema_close |
372392
| `wakeup/doc.odin` | 4 | NEW |
373393
| `wakeup/wakeup_test.odin` | 4 | NEW (unit tests) |
394+
| `wakeup/edge_test.odin` | 4 | NEW (edge + concurrent tests) |
374395
| `pool/pool.odin` | 5 | Modify (add waker, empty_was_returned) |
375396
| `pool_tests/pool_test.odin` | 5 | Add WakeUper tests |
376397
| `mbox.odin``mbox/mbox.odin` | 6 | MOVE |

mpsc/edge_test.odin

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package mpsc
2+
3+
import "core:testing"
4+
import "core:thread"
5+
6+
// ----------------------------------------------------------------------------
7+
// Edge cases and stress tests
8+
// ----------------------------------------------------------------------------
9+
10+
// test_stub_recycling_explicit exercises the stub-recycling path in pop.
11+
// That path runs when exactly one item remains (head == tail, next == nil).
12+
// Each push/pop cycle of a single item triggers it.
13+
@(test)
14+
test_stub_recycling_explicit :: proc(t: ^testing.T) {
15+
q: Queue(_Test_Msg)
16+
init(&q)
17+
for i in 0 ..< 5 {
18+
msg := _Test_Msg{data = i}
19+
push(&q, &msg)
20+
got := pop(&q)
21+
testing.expectf(t, got != nil && got.data == i, "round %d: pop should return the pushed message", i)
22+
testing.expectf(t, length(&q) == 0, "round %d: length should be 0 after pop", i)
23+
}
24+
}
25+
26+
// test_pop_all_drains_to_zero pushes N messages then pops until the queue is empty.
27+
// Verifies all messages are received and length reaches zero.
28+
@(test)
29+
test_pop_all_drains_to_zero :: proc(t: ^testing.T) {
30+
q: Queue(_Test_Msg)
31+
init(&q)
32+
33+
N :: 50
34+
msgs: [N]_Test_Msg
35+
for i in 0 ..< N {
36+
msgs[i].data = i
37+
push(&q, &msgs[i])
38+
}
39+
40+
count := 0
41+
for length(&q) > 0 || count < N {
42+
if pop(&q) != nil {
43+
count += 1
44+
}
45+
if length(&q) == 0 && count == N {
46+
break
47+
}
48+
}
49+
50+
testing.expect(t, count == N, "should drain all pushed messages")
51+
testing.expect(t, length(&q) == 0, "length should be 0 after full drain")
52+
}
53+
54+
// _Stress_Ctx passes queue and message slice to each producer thread.
55+
@(private)
56+
_Stress_Ctx :: struct {
57+
q: ^Queue(_Test_Msg),
58+
msgs: []_Test_Msg,
59+
}
60+
61+
_STRESS_PRODUCERS :: 10
62+
_STRESS_ITEMS_PER_PROD :: 1000
63+
64+
// test_concurrent_push_stress runs _STRESS_PRODUCERS threads each pushing
65+
// _STRESS_ITEMS_PER_PROD messages. The main thread consumes all of them.
66+
// Verifies no messages are lost and length reaches zero.
67+
@(test)
68+
test_concurrent_push_stress :: proc(t: ^testing.T) {
69+
q: Queue(_Test_Msg)
70+
init(&q)
71+
72+
total :: _STRESS_PRODUCERS * _STRESS_ITEMS_PER_PROD
73+
74+
msgs := make([]_Test_Msg, total)
75+
defer delete(msgs)
76+
77+
ctxs := make([]_Stress_Ctx, _STRESS_PRODUCERS)
78+
defer delete(ctxs)
79+
80+
for i in 0 ..< _STRESS_PRODUCERS {
81+
ctxs[i] = _Stress_Ctx {
82+
q = &q,
83+
msgs = msgs[i * _STRESS_ITEMS_PER_PROD:(i + 1) * _STRESS_ITEMS_PER_PROD],
84+
}
85+
}
86+
87+
threads := make([dynamic]^thread.Thread, 0, _STRESS_PRODUCERS)
88+
defer delete(threads)
89+
90+
for i in 0 ..< _STRESS_PRODUCERS {
91+
th := thread.create_and_start_with_poly_data(
92+
&ctxs[i],
93+
proc(ctx: ^_Stress_Ctx) {
94+
for j in 0 ..< len(ctx.msgs) {
95+
push(ctx.q, &ctx.msgs[j])
96+
}
97+
},
98+
)
99+
append(&threads, th)
100+
}
101+
102+
// Consume until all messages are received.
103+
received := 0
104+
for received < total {
105+
if pop(&q) != nil {
106+
received += 1
107+
}
108+
}
109+
110+
for th in threads {
111+
thread.join(th)
112+
thread.destroy(th)
113+
}
114+
115+
testing.expect(t, received == total, "should receive all pushed messages")
116+
testing.expect(t, length(&q) == 0, "length should be 0 after full drain")
117+
}
118+
119+
// test_length_consistency verifies that after a concurrent stress run
120+
// the length counter reaches zero and matches the drained count.
121+
@(test)
122+
test_length_consistency :: proc(t: ^testing.T) {
123+
q: Queue(_Test_Msg)
124+
init(&q)
125+
126+
N :: 200
127+
msgs: [N]_Test_Msg
128+
for i in 0 ..< N {
129+
push(&q, &msgs[i])
130+
}
131+
132+
testing.expect(t, length(&q) == N, "length should equal number of pushes")
133+
134+
count := 0
135+
for pop(&q) != nil {
136+
count += 1
137+
}
138+
139+
testing.expect(t, count == N, "should pop exactly N messages")
140+
testing.expect(t, length(&q) == 0, "length should be 0 after draining")
141+
}

wakeup/doc.odin

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
Package wakeup provides a wake-up callback interface for event loops.
3+
4+
WakeUper is a value type with three fields: ctx, wake, close.
5+
It is copyable. The zero value is valid — callers must check wake != nil before calling.
6+
7+
sema_wakeup returns a WakeUper backed by a semaphore.
8+
Use it in non-nbio loops and unit tests.
9+
10+
Usage:
11+
12+
w := wakeup.sema_wakeup()
13+
defer w.close(w.ctx)
14+
15+
// From another thread or goroutine:
16+
w.wake(w.ctx)
17+
18+
// In the event loop — wait for wake:
19+
// (depends on your loop mechanism)
20+
21+
Call close when done. close frees internal resources.
22+
*/
23+
package wakeup

wakeup/edge_test.odin

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package wakeup
2+
3+
import "core:sync"
4+
import "core:testing"
5+
import "core:thread"
6+
import "core:time"
7+
8+
// ----------------------------------------------------------------------------
9+
// Edge cases and concurrent tests
10+
// ----------------------------------------------------------------------------
11+
12+
// test_wake_with_nil_ctx verifies that _sema_wake and _sema_close
13+
// are safe no-ops when ctx is nil.
14+
@(test)
15+
test_wake_with_nil_ctx :: proc(t: ^testing.T) {
16+
_sema_wake(nil) // must not crash
17+
_sema_close(nil) // must not crash
18+
testing.expect(t, true, "nil ctx calls should not crash")
19+
}
20+
21+
_CONCURRENT_WAKERS :: 10
22+
23+
// test_concurrent_wake_signals starts _CONCURRENT_WAKERS threads that each
24+
// call wake once. Verifies the semaphore receives all signals.
25+
@(test)
26+
test_concurrent_wake_signals :: proc(t: ^testing.T) {
27+
w := sema_wakeup()
28+
29+
threads := make([dynamic]^thread.Thread, 0, _CONCURRENT_WAKERS)
30+
defer delete(threads)
31+
32+
for _ in 0 ..< _CONCURRENT_WAKERS {
33+
th := thread.create_and_start_with_poly_data(
34+
&w,
35+
proc(w: ^WakeUper) {
36+
w.wake(w.ctx)
37+
},
38+
)
39+
append(&threads, th)
40+
}
41+
42+
for th in threads {
43+
thread.join(th)
44+
thread.destroy(th)
45+
}
46+
47+
state := (^_Sema_State)(w.ctx)
48+
count := 0
49+
for sync.sema_wait_with_timeout(&state.sema, 10 * time.Millisecond) {
50+
count += 1
51+
}
52+
53+
testing.expect(t, count == _CONCURRENT_WAKERS, "all wake signals should be received")
54+
w.close(w.ctx)
55+
}
56+
57+
// _Custom_State holds flags set by a custom WakeUper's wake and close procs.
58+
@(private)
59+
_Custom_State :: struct {
60+
fired: bool,
61+
closed: bool,
62+
}
63+
64+
// test_custom_wakeup builds a WakeUper from raw procs (no semaphore).
65+
// Verifies the interface works with any implementation.
66+
@(test)
67+
test_custom_wakeup :: proc(t: ^testing.T) {
68+
state := _Custom_State{}
69+
70+
w := WakeUper {
71+
ctx = rawptr(&state),
72+
wake = proc(ctx: rawptr) {(^_Custom_State)(ctx).fired = true},
73+
close = proc(ctx: rawptr) {(^_Custom_State)(ctx).closed = true},
74+
}
75+
76+
testing.expect(t, !state.fired, "should not be fired before wake")
77+
testing.expect(t, !state.closed, "should not be closed before close")
78+
79+
w.wake(w.ctx)
80+
testing.expect(t, state.fired, "should be fired after wake")
81+
82+
w.close(w.ctx)
83+
testing.expect(t, state.closed, "should be closed after close")
84+
}
85+
86+
// test_ctx_persistence verifies that ctx stored in WakeUper matches
87+
// the allocated _Sema_State address and that the stored allocator is valid.
88+
@(test)
89+
test_ctx_persistence :: proc(t: ^testing.T) {
90+
w := sema_wakeup()
91+
state := (^_Sema_State)(w.ctx)
92+
93+
testing.expect(t, rawptr(state) == w.ctx, "ctx should point to _Sema_State")
94+
testing.expect(t, state.allocator.procedure != nil, "stored allocator should be valid")
95+
96+
w.close(w.ctx)
97+
}

wakeup/wakeup.odin

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// SPDX-FileCopyrightText: Copyright (c) 2026 g41797
2+
// SPDX-License-Identifier: MIT
3+
4+
package wakeup
5+
6+
import "core:mem"
7+
import "core:sync"
8+
9+
// WakeUper is a wake-up callback. Value type — copyable.
10+
// wake and close do NOT use #contextless — callers may use context.logger.
11+
//
12+
// Call wake to signal the consumer that work is available.
13+
// Call close when done — it frees internal resources.
14+
// The zero value is valid. Callers must check wake != nil before calling.
15+
WakeUper :: struct {
16+
ctx: rawptr,
17+
wake: proc(rawptr),
18+
close: proc(rawptr),
19+
}
20+
21+
@(private)
22+
_Sema_State :: struct {
23+
sema: sync.Sema, // zero value is valid — no init or destroy needed
24+
allocator: mem.Allocator,
25+
}
26+
27+
// sema_wakeup returns a WakeUper backed by a semaphore.
28+
// Useful for non-nbio loops and unit tests.
29+
// Call waker.close(waker.ctx) when done to free resources.
30+
sema_wakeup :: proc(allocator := context.allocator) -> WakeUper {
31+
state := new(_Sema_State, allocator)
32+
state.allocator = allocator
33+
return WakeUper{ctx = rawptr(state), wake = _sema_wake, close = _sema_close}
34+
}
35+
36+
@(private)
37+
_sema_wake :: proc(ctx: rawptr) {
38+
if ctx == nil {
39+
return
40+
}
41+
state := (^_Sema_State)(ctx)
42+
sync.sema_post(&state.sema)
43+
}
44+
45+
@(private)
46+
_sema_close :: proc(ctx: rawptr) {
47+
if ctx == nil {
48+
return
49+
}
50+
state := (^_Sema_State)(ctx)
51+
free(state, state.allocator)
52+
}

0 commit comments

Comments
 (0)