Skip to content

Commit c69ed5a

Browse files
authored
Parallel Map resize (#170)
Adds `WithSerialResize` option for legacy-compatible resizing.
1 parent 25b422b commit c69ed5a

File tree

1 file changed

+72
-5
lines changed

1 file changed

+72
-5
lines changed

map.go

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"hash/maphash"
66
"math"
7+
"runtime"
78
"strings"
89
"sync"
910
"sync/atomic"
@@ -32,6 +33,8 @@ const (
3233
metaMask uint64 = 0xffffffffff
3334
defaultMetaMasked uint64 = defaultMeta & metaMask
3435
emptyMetaSlot uint8 = 0x80
36+
// minimum buckets per goroutine during parallel resize
37+
minBucketsPerGoroutine = 64
3538
)
3639

3740
type mapResizeHint int
@@ -99,6 +102,7 @@ type Map[K comparable, V any] struct {
99102
table atomic.Pointer[mapTable[K, V]]
100103
minTableLen int
101104
growOnly bool
105+
serialResize bool
102106
}
103107

104108
type mapTable[K comparable, V any] struct {
@@ -139,8 +143,9 @@ type entry[K comparable, V any] struct {
139143

140144
// MapConfig defines configurable Map options.
141145
type MapConfig struct {
142-
sizeHint int
143-
growOnly bool
146+
sizeHint int
147+
growOnly bool
148+
serialResize bool
144149
}
145150

146151
// WithPresize configures new Map instance with capacity enough
@@ -165,6 +170,15 @@ func WithGrowOnly() func(*MapConfig) {
165170
}
166171
}
167172

173+
// WithSerialResize enables serial resizing mode, matching the behavior of
174+
// previous versions. Use for resource-constrained environments, while
175+
// parallel resizing (default) provides higher throughput.
176+
func WithSerialResize() func(*MapConfig) {
177+
return func(c *MapConfig) {
178+
c.serialResize = true
179+
}
180+
}
181+
168182
// Deprecated: use [NewMap].
169183
func NewMapOf[K comparable, V any](options ...func(*MapConfig)) *Map[K, V] {
170184
return NewMap[K, V](options...)
@@ -191,6 +205,7 @@ func NewMap[K comparable, V any](options ...func(*MapConfig)) *Map[K, V] {
191205
}
192206
m.minTableLen = len(table.buckets)
193207
m.growOnly = c.growOnly
208+
m.serialResize = c.serialResize
194209
m.table.Store(table)
195210
return m
196211
}
@@ -621,9 +636,34 @@ func (m *Map[K, V]) resize(knownTable *mapTable[K, V], hint mapResizeHint) {
621636
}
622637
// Copy the data only if we're not clearing the map.
623638
if hint != mapClearHint {
624-
for i := 0; i < tableLen; i++ {
625-
copied := copyBucket(&table.buckets[i], newTable)
626-
newTable.addSizePlain(uint64(i), copied)
639+
// Enable parallel resizing when serialResize is false and table is large enough.
640+
// Calculate optimal goroutine count based on table size and available CPUs
641+
chunks := 1
642+
if !m.serialResize && tableLen >= minBucketsPerGoroutine*2 {
643+
chunks = min(tableLen/minBucketsPerGoroutine, runtime.GOMAXPROCS(0))
644+
chunks = max(chunks, 1)
645+
}
646+
if chunks > 1 {
647+
var copyWg sync.WaitGroup
648+
chunkSize := (tableLen + chunks - 1) / chunks
649+
for c := 0; c < chunks; c++ {
650+
copyWg.Add(1)
651+
go func(start, end int) {
652+
for i := start; i < end; i++ {
653+
copied := copyBucketWithDestLock[K, V](&table.buckets[i], newTable)
654+
if copied > 0 {
655+
newTable.addSize(uint64(i), copied)
656+
}
657+
}
658+
copyWg.Done()
659+
}(c*chunkSize, min((c+1)*chunkSize, tableLen))
660+
}
661+
copyWg.Wait()
662+
} else {
663+
for i := 0; i < tableLen; i++ {
664+
copied := copyBucket(&table.buckets[i], newTable)
665+
newTable.addSizePlain(uint64(i), copied)
666+
}
627667
}
628668
}
629669
// Publish the new table and wake up all waiters.
@@ -634,6 +674,33 @@ func (m *Map[K, V]) resize(knownTable *mapTable[K, V], hint mapResizeHint) {
634674
m.resizeMu.Unlock()
635675
}
636676

677+
func copyBucketWithDestLock[K comparable, V any](
678+
b *bucketPadded[K, V],
679+
destTable *mapTable[K, V],
680+
) (copied int) {
681+
rootb := b
682+
rootb.mu.Lock()
683+
for {
684+
for i := 0; i < entriesPerMapBucket; i++ {
685+
if e := b.entries[i].Load(); e != nil {
686+
hash := maphash.Comparable(destTable.seed, e.key)
687+
bidx := uint64(len(destTable.buckets)-1) & h1(hash)
688+
destb := &destTable.buckets[bidx]
689+
destb.mu.Lock()
690+
appendToBucket(h2(hash), b.entries[i].Load(), destb)
691+
destb.mu.Unlock()
692+
copied++
693+
}
694+
}
695+
if next := b.next.Load(); next == nil {
696+
rootb.mu.Unlock()
697+
return
698+
} else {
699+
b = next
700+
}
701+
}
702+
}
703+
637704
func copyBucket[K comparable, V any](
638705
b *bucketPadded[K, V],
639706
destTable *mapTable[K, V],

0 commit comments

Comments
 (0)