Description
Describe the bug
I'm testing the experimental Streaming Bulk feature in 2.17.0, writing a stream indexer in Go.
The approach I'm taking is to asynchronously open a connection to the _bulk/stream
endpoint, then pass in a chunk of bytes containing the usual bulk request header & body, then waiting until a bulk response is streamed back.
Everything works as expected with simple requests (i.e. small documents, index only), however when testing the indexer with a real service I hit a situation where the request is sent, and the response is never received.
I've managed to reproduce this with a simple program (see below). What seems to trigger the issue is a combination of scripted request and larger requests.
Related component
Indexing
To Reproduce
I'm running OpenSearch 2.17.0 locally in docker, using the below configuration:
FROM opensearchproject/opensearch:2.17.0
RUN ./bin/opensearch-plugin install transport-reactor-netty4
opensearch:
ports:
- "9200:9200"
build:
context: opensearch
dockerfile: OpenSearch.Dockerfile
environment:
- "discovery.type=single-node"
- "bootstrap.memory_lock=true"
- "http.type=reactor-netty4"
- "DISABLE_INSTALL_DEMO_CONFIG=true" # Prevents execution of bundled demo script which installs demo certificates and security configurations to OpenSearch
- "DISABLE_SECURITY_PLUGIN=true" # Disables security plugin
- "DISABLE_PERFORMANCE_ANALYZER_AGENT_CLI=true" # Disables Performance Analyzer Plugin
ulimits:
memlock:
soft: -1 # Set memlock to unlimited (no soft or hard limit)
hard: -1
nofile:
soft: 65536 # Maximum number of open files for the opensearch user - set to at least 65536
hard: 65536
user: "1000:1000"
When running the following Go program:
(Apologies I can't replicate this with a simple curl request, as I can't split the chunks per document)
package main
import (
"context"
"encoding/json"
"io"
"log"
"net/http"
"os/signal"
"syscall"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
tr := http.DefaultTransport.(*http.Transport).Clone()
tr.DisableCompression = true
reqReader, reqWriter := io.Pipe()
respReader, respWriter := io.Pipe()
streamReady := make(chan struct{})
go func() {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://localhost:9200/_bulk/stream", reqReader)
if err != nil {
panic(err)
}
req.ContentLength = -1
req.Header.Set("Content-Type", "application/json")
log.Println("Sending stream request...")
resp, err := tr.RoundTrip(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
log.Println("Ready to stream...")
streamReady <- struct{}{}
_, err = io.Copy(respWriter, resp.Body)
if err != nil {
panic(err)
}
}()
<-streamReady
for _, body := range [][]byte{
[]byte(`{ "index": { "_index": "test", "_id": "123" }
{"script": {"source": "ctx._source.counter += params.count", "lang": "painless", "params": {"count": 1}}, "upsert": {"counter": 1}}
`),
[]byte(`{ "index": { "_index": "test" } }
{ "hello": "world", "big": "aaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" }
`),
} {
n, err := reqWriter.Write(body)
if err != nil {
panic(err)
}
log.Printf("Written doc (%d)\n", n)
var out json.RawMessage
if err := json.NewDecoder(respReader).Decode(&out); err != nil {
panic(err)
}
log.Printf("Received: %s", out)
}
<-ctx.Done()
}
This prints...
2024/09/23 08:49:35 Sending stream request...
2024/09/23 08:49:35 Ready to stream...
2024/09/23 08:49:35 Written doc (178)
2024/09/23 08:49:35 Received: {"took":118,"errors":false,"items":[{"index":{"_index":"test","_id":"123","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_primary_term":1,"status":201}}]}
2024/09/23 08:49:35 Written doc (1878)
And hangs, there is no second response received.
For comparison, using a simple request (see below) in an infinite loop with happily run continuously without issue.
{ "index": { "_index": "test" } }
{ "hello": "world" }
Expected behavior
Each request returns a response.
Additional Details
No response