Skip to content

add anyDocument interfaces and unit tests #1296 #1319

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,23 @@ static <I> InputCaller<I> on(DatabaseClient client, JSONWriteHandle apiDecl, Buf
return new InputEndpointImpl(client, apiDecl, new HandleProvider.ContentHandleProvider<>(inputHandle,null));
}

/**
* Constructs an instance of the InputCaller interface.
* This factory is useful primarily for parameters or return values of the anyDocument type.
* @param client the database client to use for making calls
* @param apiDecl the JSON api declaration specifying how to call the endpoint
* @param inputHandle the handles that provides the input content (such as StringHandle)
* @param <IC> the content type of the input handle
* @param <IR> the type for the data received by the input handle
* @param <I> the input handle
* @return the InputOutputCaller instance for calling the endpoint.
*/
static <IC,IR,I extends BufferableContentHandle<IC,IR>> InputCaller<I> onHandles(
DatabaseClient client, JSONWriteHandle apiDecl, I inputHandle
) {
return new InputEndpointImpl(client, apiDecl, new HandleProvider.DirectHandleProvider<>(inputHandle, null));
}

/**
* Makes one call to an endpoint that doesn't take endpoint constants, endpoint state, or a session.
* @param input the request data sent to the endpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,28 @@ static <I,O> InputOutputCaller<I,O> on(
return new InputOutputEndpointImpl<>(client, apiDecl, new HandleProvider.ContentHandleProvider<>(inputHandle, outputHandle));
}

/**
* Constructs an instance of the InputOutputCaller interface.
* This factory is useful primarily for parameters or return values of the anyDocument type.
* @param client the database client to use for making calls
* @param apiDecl the JSON api declaration specifying how to call the endpoint
* @param inputHandle the handles that provides the input content (such as StringHandle)
* @param outputHandle the handles that provides the output content (such as BytesHandle)
* @param <IC> the content type of the input handle
* @param <IR> the type for the data received by the input handle
* @param <OC> the content type of the output handle
* @param <OR> the type for the data received by the output handle
* @param <I> the input handle
* @param <O> the output handle
* @return the InputOutputCaller instance for calling the endpoint.
*/
static <IC,IR,OC,OR,I extends BufferableContentHandle<IC,IR>,O extends BufferableContentHandle<OC,OR>> InputOutputCaller<I,O> onHandles(
DatabaseClient client, JSONWriteHandle apiDecl,
I inputHandle, O outputHandle
) {
return new InputOutputEndpointImpl<I, O>(client, apiDecl, (HandleProvider<I, O>) new HandleProvider.DirectHandleProvider<IC,IR,OC,OR>(inputHandle, outputHandle));
}

/**
* Makes one call to an endpoint that doesn't take endpoint constants, endpoint state, or a session.
* @param input the request data sent to the endpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,23 @@ static <O> OutputCaller<O> on(
return new OutputEndpointImpl(client, apiDecl, new HandleProvider.ContentHandleProvider<>(null, outputHandle));
}

/**
* Constructs an instance of the OutputCaller interface.
* This factory is useful primarily for parameters or return values of the anyDocument type.
* @param client the database client to use for making calls
* @param apiDecl the JSON api declaration specifying how to call the endpoint
* @param outputHandle the handles that provides the output content (such as BytesHandle)
* @param <OC> the content type of the output handle
* @param <OR> the type for the data received by the output handle
* @param <O> the output handle
* @return the InputOutputCaller instance for calling the endpoint.
*/
static <OC,OR,O extends BufferableContentHandle<OC,OR>> OutputCaller<O> onHandles(
DatabaseClient client, JSONWriteHandle apiDecl, O outputHandle
) {
return new OutputEndpointImpl(client, apiDecl, new HandleProvider.DirectHandleProvider<>(null, outputHandle));
}

/**
* Makes one call to an endpoint that doesn't take endpoint constants, endpoint state, or a session.
* @return the response data from the endpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ static abstract class ValuedefImpl {

if (!"session".equals(datatype)) {
this.format = Format.getFromDataType(datatype);
if (this.format == Format.UNKNOWN) {
if (this.format == Format.UNKNOWN && !"anyDocument".equals(datatype)) {
throw new IllegalArgumentException(
"datatype must specify a document format: " + datatype
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ abstract class IOCallerImpl<I,O> extends BaseCallerImpl {
throw new IllegalArgumentException(
"endpointState parameter requires return in endpoint: "+getEndpointPath()
);
} else if (this.endpointStateParamdef.getFormat() != this.returndef.getFormat()) {
} else if (this.endpointStateParamdef.getFormat() != this.returndef.getFormat() && !"anyDocument".equals(this.returndef.getDataType())) {
throw new IllegalArgumentException(
"endpointState format must match return format in endpoint: "+getEndpointPath()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import com.marklogic.client.dataservices.InputOutputCaller;
import com.marklogic.client.document.JSONDocumentManager;
import com.marklogic.client.impl.NodeConverter;
import com.marklogic.client.io.Format;
import com.marklogic.client.io.InputStreamHandle;
import com.marklogic.client.io.JacksonHandle;
import com.marklogic.client.io.StringHandle;
import org.junit.Test;

import java.io.IOException;
Expand Down Expand Up @@ -116,10 +118,11 @@ public void testInputOutputCallerImpl() throws IOException {
InputOutputCaller.on(IOTestUtil.db, new JacksonHandle(apiObj), new InputStreamHandle(), new InputStreamHandle()
);

IOEndpoint.CallContext callContext = endpoint.newCallContext()
.withEndpointStateAs(endpointState)
.withEndpointConstantsAs(endpointConstants);
InputOutputCaller.BulkInputOutputCaller<InputStream, InputStream> bulkCaller =
endpoint.bulkCaller(endpoint.newCallContext()
.withEndpointStateAs(endpointState)
.withEndpointConstantsAs(endpointConstants));
endpoint.bulkCaller(callContext);
bulkCaller.setOutputListener(value -> {
String v = NodeConverter.InputStreamToString(value);
// System.out.println("received: "+v);
Expand All @@ -132,7 +135,78 @@ public void testInputOutputCallerImpl() throws IOException {
});
bulkCaller.awaitCompletion();

ObjectNode finalState = mapper.readValue(bulkCaller.getEndpointState(), ObjectNode.class);
ObjectNode finalState = mapper.readValue(callContext.getEndpointState().get(), ObjectNode.class);

assertEquals("mismatch between input and output size", input.size(), output.size());
assertEquals("mismatch between input and output elements", input, output);

assertNotNull("null final state", finalState);
assertTrue("final state not object", finalState.isObject());

JsonNode finalNext = finalState.get("next");
assertNotNull("null final next", finalNext);
assertTrue("final next not number", finalNext.isNumber());
assertEquals("mismatch on final next", workMax, finalNext.asInt());

JsonNode finalMax = finalState.get("workMax");
assertNotNull("null final max", finalMax);
assertTrue("final max not number", finalMax.isNumber());
assertEquals("mismatch on final max", workMax, finalMax.asInt());

JsonNode sessionCounter = finalState.get("sessionCounter");
assertNotNull("null final sessionCounter", sessionCounter);
assertTrue("final sessionCounter not number", sessionCounter.isNumber());
assertEquals("mismatch on final sessionCounter", callCount - 1, sessionCounter.asInt());

IOTestUtil.modMgr.delete(scriptPath, apiPath);
}

@Test
public void testInputOutputCallerImplAnyDoc() throws IOException {
String apiName = "bulkIOAnyDocumentInputOutputCaller.api";

int nextStart = 1;
int workMax = 4;

ObjectNode apiObj = IOTestUtil.readApi(apiName);
String scriptPath = IOTestUtil.getScriptPath(apiObj);
String apiPath = IOTestUtil.getApiPath(scriptPath);
IOTestUtil.load(apiName, apiObj, scriptPath, apiPath);

int batchSize = apiObj.get("$bulk").get("inputBatchSize").asInt();
int callCount = (workMax - nextStart) / batchSize +
(((workMax - nextStart) % batchSize) > 0 ? 1 : 0);

String endpointState = "{\"next\":"+nextStart+"}";
String endpointConstants = "{\"max\":"+workMax+"}";
Set<String> input = IOTestUtil.setOf( // Set.of(
"{\"docNum\":1, \"docName\":\"alpha\"}",
"{\"docNum\":2, \"docName\":\"beta\"}",
"{\"docNum\":3, \"docName\":\"gamma\"}"
);

Set<String> output = new HashSet<>();
InputOutputCaller<StringHandle, StringHandle> endpoint =
InputOutputCaller.onHandles(IOTestUtil.db, new JacksonHandle(apiObj), new StringHandle(), new StringHandle());
IOEndpoint.CallContext callContext = endpoint.newCallContext()
.withEndpointStateAs(endpointState)
.withEndpointConstantsAs(endpointConstants);
InputOutputCaller.BulkInputOutputCaller<StringHandle, StringHandle> bulkCaller =
endpoint.bulkCaller(callContext);
bulkCaller.setOutputListener(value -> {
String v = value.toString();
//System.out.println("received: "+v);
output.add(v);
});

input.stream().forEach(value -> {
//System.out.println("adding "+value);
bulkCaller.accept(new StringHandle(value).withFormat(Format.JSON));
});

bulkCaller.awaitCompletion();

ObjectNode finalState = mapper.readValue(callContext.getEndpointState().get(), ObjectNode.class);

assertEquals("mismatch between input and output size", input.size(), output.size());
assertEquals("mismatch between input and output elements", input, output);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,27 @@
import com.marklogic.client.dataservices.IOEndpoint;
import com.marklogic.client.dataservices.InputCaller;
import com.marklogic.client.document.JSONDocumentManager;
import com.marklogic.client.impl.NodeConverter;
import com.marklogic.client.io.Format;
import com.marklogic.client.io.InputStreamHandle;
import com.marklogic.client.io.JacksonHandle;
import com.marklogic.client.io.StringHandle;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;

import static org.junit.Assert.*;

public class BulkIOInputCallerTest {

static ObjectNode apiObj;
static String apiName = "bulkIOInputCaller.api";
static ObjectNode[] apiObj = new ObjectNode[2];
static String[] apiNames = new String[]{"bulkIOInputCaller.api", "bulkIOAnyDocumentInputCaller.api"};
static String scriptPath;
static String apiPath;
static JSONDocumentManager docMgr;
Expand All @@ -46,10 +50,13 @@ public class BulkIOInputCallerTest {
@BeforeClass
public static void setup() throws Exception {
docMgr = IOTestUtil.db.newJSONDocumentManager();
apiObj = IOTestUtil.readApi(apiName);
scriptPath = IOTestUtil.getScriptPath(apiObj);
apiPath = IOTestUtil.getApiPath(scriptPath);
IOTestUtil.load(apiName, apiObj, scriptPath, apiPath);
for (int i = 0; i < apiNames.length; i++) {
String apiName = apiNames[i];
apiObj[i] = IOTestUtil.readApi(apiName);
scriptPath = IOTestUtil.getScriptPath(apiObj[i]);
apiPath = IOTestUtil.getApiPath(scriptPath);
IOTestUtil.load(apiName, apiObj[i], scriptPath, apiPath);
}
map = new HashMap<>();
}
@Test
Expand All @@ -61,7 +68,7 @@ public void bulkInputEndpointTestWithMultipleCallContexts() {
String endpointState1 = "{\"next\":"+1+"}";
String endpointConstants1 = "{\"max\":6,\"collection\":\"bulkInputTest_2\"}";

InputCaller<InputStream> loadEndpt = InputCaller.on(IOTestUtil.db, new JacksonHandle(apiObj), new InputStreamHandle());
InputCaller<InputStream> loadEndpt = InputCaller.on(IOTestUtil.db, new JacksonHandle(apiObj[0]), new InputStreamHandle());
IOEndpoint.CallContext[] callContextArray = {loadEndpt.newCallContext()
.withEndpointStateAs(endpointState)
.withEndpointConstantsAs(endpointConstants), loadEndpt.newCallContext()
Expand Down Expand Up @@ -90,6 +97,48 @@ public void bulkInputEndpointTestWithMultipleCallContexts() {
map.get("bulkInputTest_2") >= 1);
}

@Test
public void bulkInputEndpointTestWithAnyDocument() {

String endpointState = "{\"next\":"+1+"}";
String endpointConstants = "{\"max\":6,\"collection\":\"bulkInputTest_1\"}";

String endpointState1 = "{\"next\":"+1+"}";
String endpointConstants1 = "{\"max\":6,\"collection\":\"bulkInputTest_2\"}";

InputCaller<StringHandle> loadEndpt = InputCaller.onHandles(IOTestUtil.db, new JacksonHandle(apiObj[1]), new StringHandle());
IOEndpoint.CallContext[] callContextArray = {loadEndpt.newCallContext()
.withEndpointStateAs(endpointState)
.withEndpointConstantsAs(endpointConstants), loadEndpt.newCallContext()
.withEndpointStateAs(endpointState1)
.withEndpointConstantsAs(endpointConstants1)};
InputCaller.BulkInputCaller<StringHandle> loader = loadEndpt.bulkCaller(callContextArray);

Set<String> input = IOTestUtil.setOf( // Set.of(
"{\"docNum\":1, \"docName\":\"doc1\"}",
"{\"docNum\":2, \"docName\":\"doc2\"}",
"{\"docNum\":3, \"docName\":\"doc3\"}",
"{\"docNum\":4, \"docName\":\"doc4\"}",
"{\"docNum\":5, \"docName\":\"doc5\"}",
"{\"docNum\":6, \"docName\":\"doc6\"}",
"{\"docNum\":7, \"docName\":\"doc7\"}",
"{\"docNum\":8, \"docName\":\"doc8\"}"
);

input.stream().forEach(value -> {
//System.out.println("adding "+value);
loader.accept(new StringHandle(value).withFormat(Format.JSON));
});
loader.awaitCompletion();
checkDocuments("bulkInputTest_1");
checkDocuments("bulkInputTest_2");
assertTrue("Number of documents written not as expected.", counter == 4);
assertTrue("No documents written by first callContext in - bulkInputTest_1 collection.",
map.get("bulkInputTest_1") >= 1);
assertTrue("No documents written by second callContext in - bulkInputTest_2 collection.",
map.get("bulkInputTest_2") >= 1);
}

@AfterClass
public static void cleanup(){
IOTestUtil.modMgr.delete(scriptPath, apiPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import static org.junit.Assert.assertTrue;

public class BulkIOOutputCallerTest {
static ObjectNode apiObj;
static String apiName = "bulkIOOutputCaller.api";
static ObjectNode[] apiObj = new ObjectNode[2];
static String[] apiNames = new String[]{"bulkIOOutputCaller.api", "bulkIOAnyDocumentOutputCaller.api"};
static String scriptPath;
static String apiPath;
static JSONDocumentManager docMgr;
Expand All @@ -42,10 +42,13 @@ public class BulkIOOutputCallerTest {
@BeforeClass
public static void setup() throws Exception {
docMgr = IOTestUtil.db.newJSONDocumentManager();
apiObj = IOTestUtil.readApi(apiName);
scriptPath = IOTestUtil.getScriptPath(apiObj);
apiPath = IOTestUtil.getApiPath(scriptPath);
IOTestUtil.load(apiName, apiObj, scriptPath, apiPath);
for (int i = 0; i < apiNames.length; i++) {
String apiName = apiNames[i];
apiObj[i] = IOTestUtil.readApi(apiName);
scriptPath = IOTestUtil.getScriptPath(apiObj[i]);
apiPath = IOTestUtil.getApiPath(scriptPath);
IOTestUtil.load(apiName, apiObj[i], scriptPath, apiPath);
}
writeDocuments(10,20, collectionName_1);
writeDocuments(30,40, collectionName_2);
}
Expand All @@ -58,7 +61,7 @@ public void bulkOutputCallerTestWithMultipleCallContexts() {

String endpointState2 = "{\"next\":"+1+"}";
String endpointConstants2 = "{\"max\":6,\"limit\":5,\"collection\":\"bulkOutputTest_2\"}";
OutputCaller<InputStream> endpoint = OutputCaller.on(IOTestUtil.db, new JacksonHandle(apiObj), new InputStreamHandle());
OutputCaller<InputStream> endpoint = OutputCaller.on(IOTestUtil.db, new JacksonHandle(apiObj[0]), new InputStreamHandle());
IOEndpoint.CallContext[] callContextArray = {endpoint.newCallContext()
.withEndpointStateAs(endpointState2)
.withEndpointConstantsAs(endpointConstants2), endpoint.newCallContext()
Expand Down Expand Up @@ -89,6 +92,44 @@ public void bulkOutputCallerTestWithMultipleCallContexts() {
assertEquals("unexpected output values", expected, actual);
}

@Test
public void bulkOutputCallerTestWithAnyDocuments() {

String endpointState1 = "{\"next\":"+1+"}";
String endpointConstants1 = "{\"max\":6,\"limit\":5,\"collection\":\"bulkOutputTest_1\"}";

String endpointState2 = "{\"next\":"+1+"}";
String endpointConstants2 = "{\"max\":6,\"limit\":5,\"collection\":\"bulkOutputTest_2\"}";
OutputCaller<StringHandle> endpoint = OutputCaller.onHandles(IOTestUtil.db, new JacksonHandle(apiObj[1]), new StringHandle());
IOEndpoint.CallContext[] callContextArray = {endpoint.newCallContext()
.withEndpointStateAs(endpointState2)
.withEndpointConstantsAs(endpointConstants2), endpoint.newCallContext()
.withEndpointStateAs(endpointState1)
.withEndpointConstantsAs(endpointConstants1)};
OutputCaller.BulkOutputCaller<StringHandle> bulkCaller = endpoint.bulkCaller(callContextArray);
Set<String> actual = new ConcurrentSkipListSet<>();
final AtomicBoolean duplicated = new AtomicBoolean(false);
final AtomicBoolean exceptional = new AtomicBoolean(false);
bulkCaller.setOutputListener(output -> {
try {
String serialized = output.toString();
if (actual.contains(serialized)) {
duplicated.compareAndSet(false, true);
} else {
actual.add(serialized);
}
} catch (Exception e) {
e.printStackTrace();
exceptional.compareAndSet(false, true);
}
});

bulkCaller.awaitCompletion();
assertEquals("exceptions on calls", false, exceptional.get());
assertEquals("duplicate output", false, duplicated.get());
assertEquals("unexpected output count", expected.size(), actual.size());
}

@AfterClass
public static void cleanup() {

Expand Down
Loading