Skip to content

Allow unparsed binary data to be used for ingestion #508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -512,5 +512,5 @@ public static <TDocument> JsonpDeserializer<CreateRequest<TDocument>> createCrea
}
return params;

}, SimpleEndpoint.emptyMap(), true, CreateResponse._DESERIALIZER);
}, SimpleEndpoint.emptyMap(), r -> r.document(), CreateResponse._DESERIALIZER);
}
Original file line number Diff line number Diff line change
Expand Up @@ -663,5 +663,5 @@ public static <TDocument> JsonpDeserializer<IndexRequest<TDocument>> createIndex
}
return params;

}, SimpleEndpoint.emptyMap(), true, IndexResponse._DESERIALIZER);
}, SimpleEndpoint.emptyMap(), r -> r.document(), IndexResponse._DESERIALIZER);
}
54 changes: 54 additions & 0 deletions java-client/src/main/java/co/elastic/clients/json/JsonpUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,60 @@ public static void skipValue(JsonParser parser, Event event) {
}
}

/**
* Copy the JSON value at the current parser location to a JSON generator.
*/
public static void copy(JsonParser parser, JsonGenerator generator, JsonParser.Event event) {

switch (event) {
case START_OBJECT:
generator.writeStartObject();
while ((event = parser.next()) != Event.END_OBJECT) {
expectEvent(parser, Event.KEY_NAME, event);
generator.writeKey(parser.getString());
copy(parser, generator, parser.next());
}
generator.writeEnd();
break;

case START_ARRAY:
generator.writeStartArray();
generator.writeStartObject();
while ((event = parser.next()) != Event.END_ARRAY) {
copy(parser, generator, event);
}
generator.writeEnd();
break;

case VALUE_STRING:
generator.write(parser.getString());
break;

case VALUE_FALSE:
generator.write(false);
break;

case VALUE_TRUE:
generator.write(true);
break;

case VALUE_NULL:
generator.writeNull();
break;

case VALUE_NUMBER:
if (parser.isIntegralNumber()) {
generator.write(parser.getLong());
} else {
generator.write(parser.getBigDecimal());
}
break;

default:
throw new UnexpectedJsonEventException(parser, event);
}
}

public static <T> void serialize(T value, JsonGenerator generator, @Nullable JsonpSerializer<T> serializer, JsonpMapper mapper) {
if (serializer != null) {
serializer.serialize(value, generator, mapper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,23 @@ default Map<String, String> headers(RequestT request) {
return Collections.emptyMap();
}

boolean hasRequestBody();
/**
* Get the body for a request. The caller must handle several cases depending on the interface implemented by the result:
* <li>
* {@code null} means the request has no body.
* </li>
* <li>
* {@link co.elastic.clients.json.NdJsonpSerializable} must be serialized as nd-json.
* </li>
* <li>
* {@link co.elastic.clients.util.BinaryData} must be serialized as is.
* </li>
* <li>
* All other objects must be serialized as JSON using a {@link co.elastic.clients.json.JsonpMapper}
* </li>
*/
@Nullable
Object body(RequestT request);

/**
* Is this status code to be considered as an error?
Expand All @@ -90,7 +106,7 @@ default BinaryEndpoint<RequestT> withBinaryResponse() {
this::requestUrl,
this::queryParameters,
this::headers,
this.hasRequestBody(),
this::body,
null
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@

public class BinaryEndpoint<RequestT> extends EndpointBase<RequestT, BinaryResponse> {

public BinaryEndpoint(
String id,
Function<RequestT, String> method,
Function<RequestT, String> requestUrl,
Function<RequestT,
Map<String, String>> queryParameters,
Function<RequestT, Map<String, String>> headers,
Function<RequestT, Object> body,
Object ignored // same number of arguments as SimpleEndpoint
) {
super(id, method, requestUrl, queryParameters, headers, body);
}

public BinaryEndpoint(
String id,
Function<RequestT, String> method,
Expand All @@ -34,7 +47,7 @@ public BinaryEndpoint(
boolean hasRequestBody,
Object ignored // same number of arguments as SimpleEndpoint
) {
super(id, method, requestUrl, queryParameters, headers, hasRequestBody);
super(id, method, requestUrl, queryParameters, headers, hasRequestBody ? returnSelf() : returnNull());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public BooleanEndpoint(
boolean hasRequestBody,
Object ignored // same number of arguments as SimpleEndpoint
) {
super(id, method, requestUrl, queryParameters, headers, hasRequestBody);
super(id, method, requestUrl, queryParameters, headers, hasRequestBody ? returnSelf() : returnNull());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ public Map<String, String> headers(Req request) {
return endpoint.headers(request);
}

@Nullable
@Override
public boolean hasRequestBody() {
return endpoint.hasRequestBody();
public Object body(Req request) {
return endpoint.body(request);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
import co.elastic.clients.transport.Endpoint;
import org.apache.http.client.utils.URLEncodedUtils;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;

public class EndpointBase<RequestT, ResponseT> implements Endpoint<RequestT, ResponseT, ErrorResponse> {

private static final Function<?, Map<String, String>> EMPTY_MAP = x -> Collections.emptyMap();
private static final Function<?, Object> RETURN_NULL = x -> null;
private static final Function<?, ?> RETURN_SELF = x -> x;

/**
* Returns a function that always returns an empty String to String map. Useful to avoid creating lots of
Expand All @@ -41,27 +44,44 @@ public static <T> Function<T, Map<String, String>> emptyMap() {
return (Function<T, Map<String, String>>) EMPTY_MAP;
}

/**
* Returns a function that always returns {@code null}.
*/
@SuppressWarnings("unchecked")
static <T, U> Function<T, U> returnNull() {
return (Function<T, U>) RETURN_NULL;
}

/**
* Returns a function that always returns its parameter. It's similar to {@code Function.identity()} with the difference
* that the input and output generic parameters are different, making it suitable for use in a wider range of use cases.
*/
@SuppressWarnings("unchecked")
static <T, U> Function<T, U> returnSelf() {
return (Function<T, U>) RETURN_SELF;
}

protected final String id;
protected final Function<RequestT, String> method;
protected final Function<RequestT, String> requestUrl;
protected final Function<RequestT, Map<String, String>> queryParameters;
protected final Function<RequestT, Map<String, String>> headers;
protected final boolean hasRequestBody;
protected final Function<RequestT, Object> body;

public EndpointBase(
String id,
Function<RequestT, String> method,
Function<RequestT, String> requestUrl,
Function<RequestT, Map<String, String>> queryParameters,
Function<RequestT, Map<String, String>> headers,
boolean hasRequestBody
Function<RequestT, Object> body
) {
this.id = id;
this.method = method;
this.requestUrl = requestUrl;
this.queryParameters = queryParameters;
this.headers = headers;
this.hasRequestBody = hasRequestBody;
this.body = body;
}

@Override
Expand Down Expand Up @@ -89,9 +109,10 @@ public Map<String, String> headers(RequestT request) {
return this.headers.apply(request);
}

@Nullable
@Override
public boolean hasRequestBody() {
return this.hasRequestBody;
public Object body(RequestT request) {
return this.body.apply(request);
}

// ES-specific
Expand All @@ -114,7 +135,7 @@ public <NewResponseT> SimpleEndpoint<RequestT, NewResponseT> withResponseDeseria
requestUrl,
queryParameters,
headers,
hasRequestBody,
body,
newResponseParser
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,12 @@
import co.elastic.clients.transport.JsonEndpoint;
import org.apache.http.client.utils.URLEncodedUtils;

import java.util.Collections;
import java.util.Map;
import java.util.function.Function;

public class SimpleEndpoint<RequestT, ResponseT> extends EndpointBase<RequestT, ResponseT>
implements JsonEndpoint<RequestT, ResponseT, ErrorResponse> {

private static final Function<?, Map<String, String>> EMPTY_MAP = x -> Collections.emptyMap();

/**
* Returns a function that always returns an empty String to String map. Useful to avoid creating lots of
* duplicate lambdas in endpoints that don't have headers or parameters.
*/
@SuppressWarnings("unchecked")
public static <T> Function<T, Map<String, String>> emptyMap() {
return (Function<T, Map<String, String>>) EMPTY_MAP;
}

private final JsonpDeserializer<ResponseT> responseParser;

public SimpleEndpoint(
Expand All @@ -50,13 +38,33 @@ public SimpleEndpoint(
Function<RequestT, String> requestUrl,
Function<RequestT, Map<String, String>> queryParameters,
Function<RequestT, Map<String, String>> headers,
boolean hasRequestBody,
Function<RequestT, Object> body,
JsonpDeserializer<ResponseT> responseParser
) {
super(id, method, requestUrl, queryParameters, headers, hasRequestBody);
super(id, method, requestUrl, queryParameters, headers, body);
this.responseParser = responseParser;
}

public SimpleEndpoint(
String id,
Function<RequestT, String> method,
Function<RequestT, String> requestUrl,
Function<RequestT, Map<String, String>> queryParameters,
Function<RequestT, Map<String, String>> headers,
boolean hasResponseBody,
JsonpDeserializer<ResponseT> responseParser
) {
this(
id,
method,
requestUrl,
queryParameters,
headers,
hasResponseBody ? returnSelf() : returnNull(),
responseParser
);
}

@Override
public JsonpDeserializer<ResponseT> responseDeserializer() {
return this.responseParser;
Expand All @@ -76,7 +84,7 @@ public <NewResponseT> SimpleEndpoint<RequestT, NewResponseT> withResponseDeseria
requestUrl,
queryParameters,
headers,
hasRequestBody,
body,
newResponseParser
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -215,21 +216,41 @@ private <RequestT> org.elasticsearch.client.Request prepareLowLevelRequest(

clientReq.addParameters(params);

if (endpoint.hasRequestBody()) {
// Request has a body and must implement JsonpSerializable or NdJsonpSerializable

if (request instanceof NdJsonpSerializable) {
Object body = endpoint.body(request);
if (body != null) {
// Request has a body
if (body instanceof NdJsonpSerializable) {
List<ByteBuffer> lines = new ArrayList<>();
collectNdJsonLines(lines, (NdJsonpSerializable)request);
collectNdJsonLines(lines, (NdJsonpSerializable) request);
clientReq.setEntity(new MultiBufferEntity(lines, JsonContentType));

} else if (body instanceof BinaryData) {
BinaryData data = (BinaryData)body;

// ES expects the Accept and Content-Type headers to be consistent.
ContentType contentType;
String dataContentType = data.contentType();
if (co.elastic.clients.util.ContentType.APPLICATION_JSON.equals(dataContentType)) {
// Fast path
contentType = JsonContentType;
} else {
contentType = ContentType.parse(dataContentType);
}

clientReq.setEntity(new MultiBufferEntity(
Collections.singletonList(data.asByteBuffer()),
contentType
));

} else {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
JsonGenerator generator = mapper.jsonProvider().createGenerator(baos);
mapper.serialize(request, generator);
mapper.serialize(body, generator);
generator.close();
clientReq.setEntity(new ByteArrayEntity(baos.toByteArray(), JsonContentType));
}
}

// Request parameter intercepted by LLRC
clientReq.addParameter("ignore", "400,401,403,404,405");
return clientReq;
Expand Down
Loading