Skip to content

Commit c2e0d94

Browse files
authored
feat(producer): add sync pool for channel reuse (#3109)
Use a sync.Pool for the sync producer expectation channels to allow for re-use. Signed-off-by: k.torgaev <[email protected]>
1 parent 3c67885 commit c2e0d94

File tree

1 file changed

+22
-10
lines changed

1 file changed

+22
-10
lines changed

sync_producer.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@ package sarama
22

33
import "sync"
44

5+
var expectationsPool = sync.Pool{
6+
New: func() interface{} {
7+
return make(chan *ProducerError, 1)
8+
},
9+
}
10+
511
// SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct
612
// broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer
713
// to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
@@ -110,32 +116,38 @@ func verifyProducerConfig(config *Config) error {
110116
}
111117

112118
func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
113-
expectation := make(chan *ProducerError, 1)
119+
expectation := expectationsPool.Get().(chan *ProducerError)
114120
msg.expectation = expectation
115121
sp.producer.Input() <- msg
116-
117-
if pErr := <-expectation; pErr != nil {
122+
pErr := <-expectation
123+
msg.expectation = nil
124+
expectationsPool.Put(expectation)
125+
if pErr != nil {
118126
return -1, -1, pErr.Err
119127
}
120128

121129
return msg.Partition, msg.Offset, nil
122130
}
123131

124132
func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
125-
expectations := make(chan chan *ProducerError, len(msgs))
133+
indices := make(chan int, len(msgs))
126134
go func() {
127-
for _, msg := range msgs {
128-
expectation := make(chan *ProducerError, 1)
135+
for i, msg := range msgs {
136+
expectation := expectationsPool.Get().(chan *ProducerError)
129137
msg.expectation = expectation
130138
sp.producer.Input() <- msg
131-
expectations <- expectation
139+
indices <- i
132140
}
133-
close(expectations)
141+
close(indices)
134142
}()
135143

136144
var errors ProducerErrors
137-
for expectation := range expectations {
138-
if pErr := <-expectation; pErr != nil {
145+
for i := range indices {
146+
expectation := msgs[i].expectation
147+
pErr := <-expectation
148+
msgs[i].expectation = nil
149+
expectationsPool.Put(expectation)
150+
if pErr != nil {
139151
errors = append(errors, pErr)
140152
}
141153
}

0 commit comments

Comments
 (0)