Skip to content

Commit c2574e8

Browse files
authored
Allow unparsed binary data to be used for ingestion (#508) (#512)
1 parent 382c671 commit c2574e8

File tree

16 files changed

+390
-71
lines changed

16 files changed

+390
-71
lines changed

java-client/src/main/java/co/elastic/clients/elasticsearch/core/CreateRequest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -553,5 +553,5 @@ public static <TDocument> JsonpDeserializer<CreateRequest<TDocument>> createCrea
553553
}
554554
return params;
555555

556-
}, SimpleEndpoint.emptyMap(), true, CreateResponse._DESERIALIZER);
556+
}, SimpleEndpoint.emptyMap(), r -> r.document(), CreateResponse._DESERIALIZER);
557557
}

java-client/src/main/java/co/elastic/clients/elasticsearch/core/IndexRequest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -718,5 +718,5 @@ public static <TDocument> JsonpDeserializer<IndexRequest<TDocument>> createIndex
718718
}
719719
return params;
720720

721-
}, SimpleEndpoint.emptyMap(), true, IndexResponse._DESERIALIZER);
721+
}, SimpleEndpoint.emptyMap(), r -> r.document(), IndexResponse._DESERIALIZER);
722722
}

java-client/src/main/java/co/elastic/clients/json/JsonpUtils.java

+54
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,60 @@ public static void skipValue(JsonParser parser, Event event) {
150150
}
151151
}
152152

153+
/**
154+
* Copy the JSON value at the current parser location to a JSON generator.
155+
*/
156+
public static void copy(JsonParser parser, JsonGenerator generator, JsonParser.Event event) {
157+
158+
switch (event) {
159+
case START_OBJECT:
160+
generator.writeStartObject();
161+
while ((event = parser.next()) != Event.END_OBJECT) {
162+
expectEvent(parser, Event.KEY_NAME, event);
163+
generator.writeKey(parser.getString());
164+
copy(parser, generator, parser.next());
165+
}
166+
generator.writeEnd();
167+
break;
168+
169+
case START_ARRAY:
170+
generator.writeStartArray();
171+
generator.writeStartObject();
172+
while ((event = parser.next()) != Event.END_ARRAY) {
173+
copy(parser, generator, event);
174+
}
175+
generator.writeEnd();
176+
break;
177+
178+
case VALUE_STRING:
179+
generator.write(parser.getString());
180+
break;
181+
182+
case VALUE_FALSE:
183+
generator.write(false);
184+
break;
185+
186+
case VALUE_TRUE:
187+
generator.write(true);
188+
break;
189+
190+
case VALUE_NULL:
191+
generator.writeNull();
192+
break;
193+
194+
case VALUE_NUMBER:
195+
if (parser.isIntegralNumber()) {
196+
generator.write(parser.getLong());
197+
} else {
198+
generator.write(parser.getBigDecimal());
199+
}
200+
break;
201+
202+
default:
203+
throw new UnexpectedJsonEventException(parser, event);
204+
}
205+
}
206+
153207
public static <T> void serialize(T value, JsonGenerator generator, @Nullable JsonpSerializer<T> serializer, JsonpMapper mapper) {
154208
if (serializer != null) {
155209
serializer.serialize(value, generator, mapper);

java-client/src/main/java/co/elastic/clients/transport/Endpoint.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,23 @@ default Map<String, String> headers(RequestT request) {
7070
return Collections.emptyMap();
7171
}
7272

73-
boolean hasRequestBody();
73+
/**
74+
* Get the body for a request. The caller must handle several cases depending on the interface implemented by the result:
75+
* <li>
76+
* {@code null} means the request has no body.
77+
* </li>
78+
* <li>
79+
* {@link co.elastic.clients.json.NdJsonpSerializable} must be serialized as nd-json.
80+
* </li>
81+
* <li>
82+
* {@link co.elastic.clients.util.BinaryData} must be serialized as is.
83+
* </li>
84+
* <li>
85+
* All other objects must be serialized as JSON using a {@link co.elastic.clients.json.JsonpMapper}
86+
* </li>
87+
*/
88+
@Nullable
89+
Object body(RequestT request);
7490

7591
/**
7692
* Is this status code to be considered as an error?
@@ -90,7 +106,7 @@ default BinaryEndpoint<RequestT> withBinaryResponse() {
90106
this::requestUrl,
91107
this::queryParameters,
92108
this::headers,
93-
this.hasRequestBody(),
109+
this::body,
94110
null
95111
);
96112
}

java-client/src/main/java/co/elastic/clients/transport/endpoints/BinaryEndpoint.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,19 @@
2424

2525
public class BinaryEndpoint<RequestT> extends EndpointBase<RequestT, BinaryResponse> {
2626

27+
public BinaryEndpoint(
28+
String id,
29+
Function<RequestT, String> method,
30+
Function<RequestT, String> requestUrl,
31+
Function<RequestT,
32+
Map<String, String>> queryParameters,
33+
Function<RequestT, Map<String, String>> headers,
34+
Function<RequestT, Object> body,
35+
Object ignored // same number of arguments as SimpleEndpoint
36+
) {
37+
super(id, method, requestUrl, queryParameters, headers, body);
38+
}
39+
2740
public BinaryEndpoint(
2841
String id,
2942
Function<RequestT, String> method,
@@ -34,7 +47,7 @@ public BinaryEndpoint(
3447
boolean hasRequestBody,
3548
Object ignored // same number of arguments as SimpleEndpoint
3649
) {
37-
super(id, method, requestUrl, queryParameters, headers, hasRequestBody);
50+
super(id, method, requestUrl, queryParameters, headers, hasRequestBody ? returnSelf() : returnNull());
3851
}
3952

4053
@Override

java-client/src/main/java/co/elastic/clients/transport/endpoints/BooleanEndpoint.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public BooleanEndpoint(
3434
boolean hasRequestBody,
3535
Object ignored // same number of arguments as SimpleEndpoint
3636
) {
37-
super(id, method, requestUrl, queryParameters, headers, hasRequestBody);
37+
super(id, method, requestUrl, queryParameters, headers, hasRequestBody ? returnSelf() : returnNull());
3838
}
3939

4040
@Override

java-client/src/main/java/co/elastic/clients/transport/endpoints/DelegatingJsonEndpoint.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,10 @@ public Map<String, String> headers(Req request) {
5858
return endpoint.headers(request);
5959
}
6060

61+
@Nullable
6162
@Override
62-
public boolean hasRequestBody() {
63-
return endpoint.hasRequestBody();
63+
public Object body(Req request) {
64+
return endpoint.body(request);
6465
}
6566

6667
@Override

java-client/src/main/java/co/elastic/clients/transport/endpoints/EndpointBase.java

+27-6
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,16 @@
2424
import co.elastic.clients.transport.Endpoint;
2525
import org.apache.http.client.utils.URLEncodedUtils;
2626

27+
import javax.annotation.Nullable;
2728
import java.util.Collections;
2829
import java.util.Map;
2930
import java.util.function.Function;
3031

3132
public class EndpointBase<RequestT, ResponseT> implements Endpoint<RequestT, ResponseT, ErrorResponse> {
3233

3334
private static final Function<?, Map<String, String>> EMPTY_MAP = x -> Collections.emptyMap();
35+
private static final Function<?, Object> RETURN_NULL = x -> null;
36+
private static final Function<?, ?> RETURN_SELF = x -> x;
3437

3538
/**
3639
* Returns a function that always returns an empty String to String map. Useful to avoid creating lots of
@@ -41,27 +44,44 @@ public static <T> Function<T, Map<String, String>> emptyMap() {
4144
return (Function<T, Map<String, String>>) EMPTY_MAP;
4245
}
4346

47+
/**
48+
* Returns a function that always returns {@code null}.
49+
*/
50+
@SuppressWarnings("unchecked")
51+
static <T, U> Function<T, U> returnNull() {
52+
return (Function<T, U>) RETURN_NULL;
53+
}
54+
55+
/**
56+
* Returns a function that always returns its parameter. It's similar to {@code Function.identity()} with the difference
57+
* that the input and output generic parameters are different, making it suitable for use in a wider range of use cases.
58+
*/
59+
@SuppressWarnings("unchecked")
60+
static <T, U> Function<T, U> returnSelf() {
61+
return (Function<T, U>) RETURN_SELF;
62+
}
63+
4464
protected final String id;
4565
protected final Function<RequestT, String> method;
4666
protected final Function<RequestT, String> requestUrl;
4767
protected final Function<RequestT, Map<String, String>> queryParameters;
4868
protected final Function<RequestT, Map<String, String>> headers;
49-
protected final boolean hasRequestBody;
69+
protected final Function<RequestT, Object> body;
5070

5171
public EndpointBase(
5272
String id,
5373
Function<RequestT, String> method,
5474
Function<RequestT, String> requestUrl,
5575
Function<RequestT, Map<String, String>> queryParameters,
5676
Function<RequestT, Map<String, String>> headers,
57-
boolean hasRequestBody
77+
Function<RequestT, Object> body
5878
) {
5979
this.id = id;
6080
this.method = method;
6181
this.requestUrl = requestUrl;
6282
this.queryParameters = queryParameters;
6383
this.headers = headers;
64-
this.hasRequestBody = hasRequestBody;
84+
this.body = body;
6585
}
6686

6787
@Override
@@ -89,9 +109,10 @@ public Map<String, String> headers(RequestT request) {
89109
return this.headers.apply(request);
90110
}
91111

112+
@Nullable
92113
@Override
93-
public boolean hasRequestBody() {
94-
return this.hasRequestBody;
114+
public Object body(RequestT request) {
115+
return this.body.apply(request);
95116
}
96117

97118
// ES-specific
@@ -114,7 +135,7 @@ public <NewResponseT> SimpleEndpoint<RequestT, NewResponseT> withResponseDeseria
114135
requestUrl,
115136
queryParameters,
116137
headers,
117-
hasRequestBody,
138+
body,
118139
newResponseParser
119140
);
120141
}

java-client/src/main/java/co/elastic/clients/transport/endpoints/SimpleEndpoint.java

+23-15
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,12 @@
2424
import co.elastic.clients.transport.JsonEndpoint;
2525
import org.apache.http.client.utils.URLEncodedUtils;
2626

27-
import java.util.Collections;
2827
import java.util.Map;
2928
import java.util.function.Function;
3029

3130
public class SimpleEndpoint<RequestT, ResponseT> extends EndpointBase<RequestT, ResponseT>
3231
implements JsonEndpoint<RequestT, ResponseT, ErrorResponse> {
3332

34-
private static final Function<?, Map<String, String>> EMPTY_MAP = x -> Collections.emptyMap();
35-
36-
/**
37-
* Returns a function that always returns an empty String to String map. Useful to avoid creating lots of
38-
* duplicate lambdas in endpoints that don't have headers or parameters.
39-
*/
40-
@SuppressWarnings("unchecked")
41-
public static <T> Function<T, Map<String, String>> emptyMap() {
42-
return (Function<T, Map<String, String>>) EMPTY_MAP;
43-
}
44-
4533
private final JsonpDeserializer<ResponseT> responseParser;
4634

4735
public SimpleEndpoint(
@@ -50,13 +38,33 @@ public SimpleEndpoint(
5038
Function<RequestT, String> requestUrl,
5139
Function<RequestT, Map<String, String>> queryParameters,
5240
Function<RequestT, Map<String, String>> headers,
53-
boolean hasRequestBody,
41+
Function<RequestT, Object> body,
5442
JsonpDeserializer<ResponseT> responseParser
5543
) {
56-
super(id, method, requestUrl, queryParameters, headers, hasRequestBody);
44+
super(id, method, requestUrl, queryParameters, headers, body);
5745
this.responseParser = responseParser;
5846
}
5947

48+
public SimpleEndpoint(
49+
String id,
50+
Function<RequestT, String> method,
51+
Function<RequestT, String> requestUrl,
52+
Function<RequestT, Map<String, String>> queryParameters,
53+
Function<RequestT, Map<String, String>> headers,
54+
boolean hasResponseBody,
55+
JsonpDeserializer<ResponseT> responseParser
56+
) {
57+
this(
58+
id,
59+
method,
60+
requestUrl,
61+
queryParameters,
62+
headers,
63+
hasResponseBody ? returnSelf() : returnNull(),
64+
responseParser
65+
);
66+
}
67+
6068
@Override
6169
public JsonpDeserializer<ResponseT> responseDeserializer() {
6270
return this.responseParser;
@@ -76,7 +84,7 @@ public <NewResponseT> SimpleEndpoint<RequestT, NewResponseT> withResponseDeseria
7684
requestUrl,
7785
queryParameters,
7886
headers,
79-
hasRequestBody,
87+
body,
8088
newResponseParser
8189
);
8290
}

java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientTransport.java

+27-6
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.nio.charset.StandardCharsets;
6060
import java.util.ArrayList;
6161
import java.util.Arrays;
62+
import java.util.Collections;
6263
import java.util.HashSet;
6364
import java.util.Iterator;
6465
import java.util.List;
@@ -215,21 +216,41 @@ private <RequestT> org.elasticsearch.client.Request prepareLowLevelRequest(
215216

216217
clientReq.addParameters(params);
217218

218-
if (endpoint.hasRequestBody()) {
219-
// Request has a body and must implement JsonpSerializable or NdJsonpSerializable
220-
221-
if (request instanceof NdJsonpSerializable) {
219+
Object body = endpoint.body(request);
220+
if (body != null) {
221+
// Request has a body
222+
if (body instanceof NdJsonpSerializable) {
222223
List<ByteBuffer> lines = new ArrayList<>();
223-
collectNdJsonLines(lines, (NdJsonpSerializable)request);
224+
collectNdJsonLines(lines, (NdJsonpSerializable) request);
224225
clientReq.setEntity(new MultiBufferEntity(lines, JsonContentType));
226+
227+
} else if (body instanceof BinaryData) {
228+
BinaryData data = (BinaryData)body;
229+
230+
// ES expects the Accept and Content-Type headers to be consistent.
231+
ContentType contentType;
232+
String dataContentType = data.contentType();
233+
if (co.elastic.clients.util.ContentType.APPLICATION_JSON.equals(dataContentType)) {
234+
// Fast path
235+
contentType = JsonContentType;
236+
} else {
237+
contentType = ContentType.parse(dataContentType);
238+
}
239+
240+
clientReq.setEntity(new MultiBufferEntity(
241+
Collections.singletonList(data.asByteBuffer()),
242+
contentType
243+
));
244+
225245
} else {
226246
ByteArrayOutputStream baos = new ByteArrayOutputStream();
227247
JsonGenerator generator = mapper.jsonProvider().createGenerator(baos);
228-
mapper.serialize(request, generator);
248+
mapper.serialize(body, generator);
229249
generator.close();
230250
clientReq.setEntity(new ByteArrayEntity(baos.toByteArray(), JsonContentType));
231251
}
232252
}
253+
233254
// Request parameter intercepted by LLRC
234255
clientReq.addParameter("ignore", "400,401,403,404,405");
235256
return clientReq;

0 commit comments

Comments
 (0)