Skip to content

Commit ce4ae98

Browse files
committed
Added timeout to get from pool.
1 parent a1212b8 commit ce4ae98

File tree

8 files changed

+254
-11
lines changed

8 files changed

+254
-11
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ Both are thread-safe. Both have zero allocations for sending or receiving.
104104
| [Interrupt](examples/interrupt.odin) | How to wake a waiting thread without sending a message. |
105105
| [Close](examples/close.odin) | Stop the game and get back all unprocessed messages. |
106106
| [Master](examples/master.odin) | Pool + mailbox owned by one struct. Coordinated shutdown. |
107+
| [Pool Wait](examples/pool_wait.odin) | N players share M tokens (M < N); players wait for a recycled token. |
108+
109+
See the [Pool section](#pool) below for message recycling.
107110

108111
---
109112

@@ -214,6 +217,7 @@ To reuse messages, use the `pool` package.
214217
```odin
215218
import pool_pkg "path/to/odin-mbox/pool"
216219
import "core:mem"
220+
import "core:time"
217221
218222
// Your struct — both fields required when using pool.
219223
My_Msg :: struct {
@@ -229,10 +233,14 @@ if ok, _ := pool_pkg.init(&p, initial_msgs = 64, max_msgs = 256, reset = nil); !
229233
}
230234
231235
// Sender: get from pool, fill, send.
236+
// .Always (default): allocates new if pool empty.
232237
msg, _ := pool_pkg.get(&p)
233238
msg.data = 42
234239
mbox.send(&mb, msg)
235240
241+
// .Pool_Only + timeout: wait up to 100 ms for a recycled message.
242+
msg2, status := pool_pkg.get(&p, .Pool_Only, 100 * time.Millisecond)
243+
236244
// Receiver: receive, use, return to pool.
237245
got, err := mbox.wait_receive(&mb)
238246
if err == .None { pool_pkg.put(&p, got) }

build_docs.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,7 @@ echo "--- Generating HTML ---"
1818
"$ROOT_DIR/docs/generate.sh"
1919

2020
# 3. Start local preview
21+
# Opens http://localhost:8000 — Source File links point to GitHub and require
22+
# the changes to be pushed before they resolve. Press Ctrl+C to stop.
2123
echo "--- Starting Preview ---"
2224
"$ROOT_DIR/tools/preview_docs.sh"

docs/generate.sh

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,29 @@ sed -i 's|href="/|href="./|g' index.html
3535
sed -i 's|src="/|src="./|g' index.html
3636
# Fix the library link specifically (it should point to its own subdirectory)
3737
sed -i 's|href="./odin-mbox"|href="./odin-mbox/"|g' index.html
38+
# Fix the blank root package link text
39+
sed -i 's|<a href="./odin-mbox/"></a>|<a href="./odin-mbox/">mbox</a>|g' index.html
3840

39-
# 2. Package index.html (in odin-mbox/ directory)
41+
# 2. Collection home index.html (in odin-mbox/ directory — depth 1)
4042
if [ -d "odin-mbox" ]; then
4143
sed -i 's|href="/|href="../|g' odin-mbox/index.html
4244
sed -i 's|src="/|src="../|g' odin-mbox/index.html
4345
# Fix self-links and navigation in the package page
4446
sed -i 's|href="\.\./odin-mbox"|href="../odin-mbox/"|g' odin-mbox/index.html
47+
# Fix the blank root package link text
48+
sed -i 's|<a href="../odin-mbox/"></a>|<a href="../odin-mbox/">mbox</a>|g' odin-mbox/index.html
4549
fi
4650

51+
# 3. Sub-package index.html files (in odin-mbox/*/ directory — depth 2)
52+
for subdir in odin-mbox/*/; do
53+
if [ -f "${subdir}index.html" ]; then
54+
sed -i 's|href="/|href="../../|g' "${subdir}index.html"
55+
sed -i 's|src="/|src="../../|g' "${subdir}index.html"
56+
# Fix the blank root package link text
57+
sed -i 's|<a href="../../odin-mbox/"></a>|<a href="../../odin-mbox/">mbox</a>|g' "${subdir}index.html"
58+
fi
59+
done
60+
4761
cd ..
4862

4963
rm odin-mbox.odin-doc

examples/pool_wait.odin

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package examples
2+
3+
import mbox ".."
4+
import pool_pkg "../pool"
5+
import list "core:container/intrusive/list"
6+
import "core:sync"
7+
import "core:thread"
8+
9+
N_PLAYERS :: 6
10+
M_TOKENS :: 2 // fewer tokens than players — forces waiting
11+
ROUNDS :: 5
12+
13+
// pool_wait_example shows N players sharing M tokens (M < N).
14+
// Players must wait (pool.get .Pool_Only, timeout=-1) until a token is returned.
15+
pool_wait_example :: proc() -> bool {
16+
p: pool_pkg.Pool(Msg)
17+
if ok, _ := pool_pkg.init(&p, initial_msgs = M_TOKENS, max_msgs = M_TOKENS, reset = nil); !ok {
18+
return false
19+
}
20+
21+
mb: mbox.Mailbox(Msg)
22+
done: sync.Sema
23+
24+
// Collector: receives all messages and returns each token to pool.
25+
thread.run_with_poly_data3(
26+
&mb, &p, &done,
27+
proc(mb: ^mbox.Mailbox(Msg), p: ^pool_pkg.Pool(Msg), done: ^sync.Sema) {
28+
total :: N_PLAYERS * ROUNDS
29+
count := 0
30+
for count < total {
31+
msg, err := mbox.wait_receive(mb)
32+
if err == .Closed {
33+
break
34+
}
35+
if err == .None {
36+
pool_pkg.put(p, msg)
37+
count += 1
38+
}
39+
}
40+
sync.sema_post(done)
41+
},
42+
)
43+
44+
// N_PLAYERS players: each waits for a token, then sends it.
45+
// Only M_TOKENS tokens exist — excess players block in pool.get until one is returned.
46+
for _ in 0 ..< N_PLAYERS {
47+
thread.run_with_poly_data2(
48+
&mb, &p,
49+
proc(mb: ^mbox.Mailbox(Msg), p: ^pool_pkg.Pool(Msg)) {
50+
for _ in 0 ..< ROUNDS {
51+
msg, status := pool_pkg.get(p, .Pool_Only, -1)
52+
if status == .Closed {
53+
break
54+
}
55+
mbox.send(mb, msg)
56+
}
57+
},
58+
)
59+
}
60+
61+
sync.sema_wait(&done)
62+
63+
remaining, _ := mbox.close(&mb)
64+
for node := list.pop_front(&remaining); node != nil; node = list.pop_front(&remaining) {
65+
msg := container_of(node, Msg, "node")
66+
pool_pkg.put(&p, msg)
67+
}
68+
pool_pkg.destroy(&p)
69+
return true
70+
}

pool/doc.odin

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ Your struct must have two fields required by the pool where clause:
2929
Status returns:
3030
- init returns (bool, Pool_Status): (true, .Ok) on success; (false, .Out_Of_Memory) on pre-alloc failure.
3131
- get returns (^T, Pool_Status): .Ok, .Pool_Empty, .Out_Of_Memory, or .Closed.
32+
With .Pool_Only strategy and timeout parameter:
33+
- timeout==0 (default): return immediately if empty (.Pool_Empty). Non-blocking.
34+
- timeout<0: wait forever until put or destroy.
35+
- timeout>0: wait up to that duration; returns (nil, .Pool_Empty) on expiry.
36+
- Returns (nil, .Closed) if pool is destroyed while waiting.
3237
- put returns ^T: nil if the message was recycled or freed; the original pointer if it was foreign
3338
(msg.allocator != pool allocator — caller must free it).
3439

pool/pool.odin

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ import "base:intrinsics"
77
import list "core:container/intrusive/list"
88
import "core:mem"
99
import "core:sync"
10+
import "core:time"
1011

11-
// _PoolNode, _PoolMutex, _PoolAllocator, _PoolEvent ensure imports are used — required by -vet for generic code.
12+
// _PoolNode, _PoolMutex, _PoolAllocator, _PoolEvent, _PoolDuration ensure imports are used — required by -vet for generic code.
1213
@(private)
1314
_PoolNode :: list.Node
1415
@(private)
@@ -17,6 +18,8 @@ _PoolMutex :: sync.Mutex
1718
_PoolAllocator :: mem.Allocator
1819
@(private)
1920
_PoolEvent :: Pool_Event
21+
@(private)
22+
_PoolDuration :: time.Duration
2023

2124
// Pool_State is the internal lifecycle of a pool.
2225
Pool_State :: enum {
@@ -52,6 +55,7 @@ Allocation_Strategy :: enum {
5255
Pool :: struct($T: typeid) {
5356
allocator: mem.Allocator,
5457
mutex: sync.Mutex,
58+
cond: sync.Cond, // wakes waiting get(.Pool_Only) calls
5559
list: list.List,
5660
curr_msgs: int,
5761
max_msgs: int, // 0 = unlimited
@@ -102,13 +106,16 @@ init :: proc(
102106
}
103107

104108
// get returns a message from the free-list.
105-
// .Always (default): allocates a new one if the pool is empty.
106-
// .Pool_Only: returns (nil, .Pool_Empty) if the pool is empty.
107-
// Returns (nil, .Closed) if the pool state is not Active.
109+
// .Always (default): allocates a new one if the pool is empty. timeout is ignored.
110+
// .Pool_Only + timeout==0: returns (nil, .Pool_Empty) immediately if empty (default behavior).
111+
// .Pool_Only + timeout<0: waits forever until put or destroy.
112+
// .Pool_Only + timeout>0: waits up to that duration; returns (nil, .Pool_Empty) on expiry.
113+
// Returns (nil, .Closed) if the pool state is not Active (including destroy while waiting).
108114
// Sets msg.allocator on every returned message. Calls reset(.Get) only for recycled messages.
109115
get :: proc(
110116
p: ^Pool($T),
111117
strategy := Allocation_Strategy.Always,
118+
timeout: time.Duration = 0,
112119
) -> (^T, Pool_Status) where intrinsics.type_has_field(T, "node"),
113120
intrinsics.type_field_type(T, "node") == list.Node,
114121
intrinsics.type_has_field(T, "allocator"),
@@ -121,6 +128,36 @@ get :: proc(
121128
}
122129

123130
raw := list.pop_front(&p.list)
131+
if raw == nil && strategy == .Pool_Only {
132+
if timeout == 0 {
133+
sync.mutex_unlock(&p.mutex)
134+
return nil, .Pool_Empty
135+
}
136+
// Wait loop: block until a message is available, pool is closed, or timeout expires.
137+
for p.list.head == nil {
138+
if p.state != .Active {
139+
sync.mutex_unlock(&p.mutex)
140+
return nil, .Closed
141+
}
142+
ok: bool
143+
if timeout < 0 {
144+
sync.cond_wait(&p.cond, &p.mutex)
145+
ok = true
146+
} else {
147+
ok = sync.cond_wait_with_timeout(&p.cond, &p.mutex, timeout)
148+
}
149+
if p.state != .Active {
150+
sync.mutex_unlock(&p.mutex)
151+
return nil, .Closed
152+
}
153+
if !ok {
154+
sync.mutex_unlock(&p.mutex)
155+
return nil, .Pool_Empty // timeout expired
156+
}
157+
}
158+
raw = list.pop_front(&p.list)
159+
}
160+
124161
if raw != nil {
125162
p.curr_msgs -= 1
126163
alloc := p.allocator
@@ -134,12 +171,7 @@ get :: proc(
134171
return msg, .Ok
135172
}
136173

137-
if strategy == .Pool_Only {
138-
sync.mutex_unlock(&p.mutex)
139-
return nil, .Pool_Empty
140-
}
141-
142-
// Fresh allocation — do not call reset.
174+
// strategy == .Always and pool was empty: fresh allocation — do not call reset.
143175
alloc := p.allocator
144176
sync.mutex_unlock(&p.mutex)
145177

@@ -187,6 +219,7 @@ put :: proc(
187219
msg.node = {}
188220
list.push_back(&p.list, &msg.node)
189221
p.curr_msgs += 1
222+
sync.cond_signal(&p.cond) // wake one waiting get(.Pool_Only)
190223
sync.mutex_unlock(&p.mutex)
191224
return nil
192225
}
@@ -220,5 +253,6 @@ destroy :: proc(
220253
p.curr_msgs -= 1
221254
free(msg, alloc)
222255
}
256+
sync.cond_broadcast(&p.cond) // wake all waiting get(.Pool_Only) calls
223257
sync.mutex_unlock(&p.mutex)
224258
}

pool_tests/pool_test.odin

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ package pool_tests
22

33
import list "core:container/intrusive/list"
44
import "core:mem"
5+
import "core:sync"
56
import "core:testing"
7+
import "core:thread"
8+
import "core:time"
69

710
import pool_pkg "../pool"
811

@@ -394,3 +397,105 @@ test_pool_reset_on_put :: proc(t: ^testing.T) {
394397
free(recycled, recycled.allocator)
395398
}
396399
}
400+
401+
// ----------------------------------------------------------------------------
402+
// Timeout tests
403+
// ----------------------------------------------------------------------------
404+
405+
@(test)
406+
test_pool_get_timeout_zero :: proc(t: ^testing.T) {
407+
p: pool_pkg.Pool(Test_Msg)
408+
pool_pkg.init(&p, reset = nil)
409+
defer pool_pkg.destroy(&p)
410+
411+
// Empty pool, .Pool_Only, timeout=0 — must return immediately with .Pool_Empty.
412+
msg, status := pool_pkg.get(&p, .Pool_Only, 0)
413+
testing.expect(t, msg == nil, "msg should be nil")
414+
testing.expect(t, status == .Pool_Empty, "status should be .Pool_Empty")
415+
}
416+
417+
@(test)
418+
test_pool_get_timeout_elapsed :: proc(t: ^testing.T) {
419+
p: pool_pkg.Pool(Test_Msg)
420+
pool_pkg.init(&p, reset = nil)
421+
defer pool_pkg.destroy(&p)
422+
423+
// Empty pool, .Pool_Only, short timeout — nobody puts, should expire with .Pool_Empty.
424+
msg, status := pool_pkg.get(&p, .Pool_Only, time.Millisecond)
425+
testing.expect(t, msg == nil, "msg should be nil after timeout")
426+
testing.expect(t, status == .Pool_Empty, "status should be .Pool_Empty after timeout")
427+
}
428+
429+
// _put_wakes_ctx holds shared state for test_pool_get_timeout_put_wakes.
430+
_Put_Wakes_Ctx :: struct {
431+
pool: ^pool_pkg.Pool(Test_Msg),
432+
msg: ^Test_Msg,
433+
ready: sync.Sema,
434+
}
435+
436+
@(test)
437+
test_pool_get_timeout_put_wakes :: proc(t: ^testing.T) {
438+
p: pool_pkg.Pool(Test_Msg)
439+
pool_pkg.init(&p, reset = nil)
440+
defer pool_pkg.destroy(&p)
441+
442+
// Pre-allocate a message to put back from the second thread.
443+
msg, _ := pool_pkg.get(&p)
444+
testing.expect(t, msg != nil, "initial get should return non-nil")
445+
if msg == nil {
446+
return
447+
}
448+
449+
ctx := _Put_Wakes_Ctx{pool = &p, msg = msg}
450+
451+
th := thread.create_and_start_with_data(&ctx, proc(data: rawptr) {
452+
c := (^_Put_Wakes_Ctx)(data)
453+
// Signal the waiter that we're ready, then put the message back.
454+
sync.sema_post(&c.ready)
455+
time.sleep(5 * time.Millisecond)
456+
pool_pkg.put(c.pool, c.msg)
457+
})
458+
459+
// Wait until the thread is running, then block on get with a long timeout.
460+
sync.sema_wait(&ctx.ready)
461+
got, status := pool_pkg.get(&p, .Pool_Only, time.Second)
462+
thread.join(th)
463+
thread.destroy(th)
464+
465+
testing.expect(t, got != nil, "get should return non-nil after put wakes it")
466+
testing.expect(t, status == .Ok, "status should be .Ok")
467+
if got != nil {
468+
free(got, got.allocator)
469+
}
470+
}
471+
472+
// _destroy_wakes_ctx holds shared state for test_pool_get_timeout_destroy_wakes.
473+
_Destroy_Wakes_Ctx :: struct {
474+
pool: ^pool_pkg.Pool(Test_Msg),
475+
ready: sync.Sema,
476+
}
477+
478+
@(test)
479+
test_pool_get_timeout_destroy_wakes :: proc(t: ^testing.T) {
480+
p: pool_pkg.Pool(Test_Msg)
481+
pool_pkg.init(&p, reset = nil)
482+
483+
ctx := _Destroy_Wakes_Ctx{pool = &p}
484+
485+
th := thread.create_and_start_with_data(&ctx, proc(data: rawptr) {
486+
c := (^_Destroy_Wakes_Ctx)(data)
487+
// Signal the waiter that we're running, then destroy the pool.
488+
sync.sema_post(&c.ready)
489+
time.sleep(5 * time.Millisecond)
490+
pool_pkg.destroy(c.pool)
491+
})
492+
493+
// Wait until the thread is running, then block on get with infinite timeout.
494+
sync.sema_wait(&ctx.ready)
495+
got, status := pool_pkg.get(&p, .Pool_Only, -1)
496+
thread.join(th)
497+
thread.destroy(th)
498+
499+
testing.expect(t, got == nil, "get should return nil when pool is destroyed")
500+
testing.expect(t, status == .Closed, "status should be .Closed")
501+
}

tests/all_test.odin

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ test_master_example :: proc(t: ^testing.T) {
4545
testing.expect(t, examples.master_example(), "master_example failed")
4646
}
4747

48+
@(test)
49+
test_pool_wait :: proc(t: ^testing.T) {
50+
testing.expect(t, examples.pool_wait_example(), "pool_wait_example failed")
51+
}
52+
4853
// --- Mailbox edge-case tests ---
4954

5055
// Msg is the local test message type.

0 commit comments

Comments
 (0)