From 1c21373fd8e021f68d419c9838fec7da28618945 Mon Sep 17 00:00:00 2001 From: lling Date: Fri, 24 Sep 2021 01:54:51 -0700 Subject: [PATCH 1/2] add anyDocument interfaces and unit tests --- .../client/dataservices/InputCaller.java | 17 +++++ .../dataservices/InputOutputCaller.java | 22 ++++++ .../client/dataservices/OutputCaller.java | 17 +++++ .../dataservices/impl/BaseCallerImpl.java | 2 +- .../dataservices/impl/IOCallerImpl.java | 2 +- .../test/dataservices/BulkIOEndpointTest.java | 73 +++++++++++++++++++ .../dataservices/BulkIOInputCallerTest.java | 63 ++++++++++++++-- .../dataservices/BulkIOOutputCallerTest.java | 55 ++++++++++++-- .../bulkIOAnyDocumentInputCaller.api | 32 ++++++++ .../bulkIOAnyDocumentInputCaller.sjs | 27 +++++++ .../bulkIOAnyDocumentInputOutputCaller.api | 32 ++++++++ .../bulkIOAnyDocumentInputOutputCaller.sjs | 21 ++++++ .../bulkIOAnyDocumentOutputCaller.api | 24 ++++++ .../bulkIOAnyDocumentOutputCaller.sjs | 19 +++++ 14 files changed, 390 insertions(+), 16 deletions(-) create mode 100644 marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputCaller.api create mode 100644 marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputCaller.sjs create mode 100644 marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputOutputCaller.api create mode 100644 marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputOutputCaller.sjs create mode 100644 marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentOutputCaller.api create mode 100644 marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentOutputCaller.sjs diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/InputCaller.java b/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/InputCaller.java index 76e9649ad..0c5550c83 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/InputCaller.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/InputCaller.java @@ -41,6 +41,23 @@ static InputCaller on(DatabaseClient client, JSONWriteHandle apiDecl, Buf return new InputEndpointImpl(client, apiDecl, new HandleProvider.ContentHandleProvider<>(inputHandle,null)); } + /** + * Constructs an instance of the InputCaller interface. + * This factory is useful primarily for parameters or return values of the anyDocument type. + * @param client the database client to use for making calls + * @param apiDecl the JSON api declaration specifying how to call the endpoint + * @param inputHandle the handles that provides the input content (such as StringHandle) + * @param the content type of the input handle + * @param the type for the data received by the input handle + * @param the input handle + * @return the InputOutputCaller instance for calling the endpoint. + */ + static > InputCaller onHandles( + DatabaseClient client, JSONWriteHandle apiDecl, I inputHandle + ) { + return new InputEndpointImpl(client, apiDecl, new HandleProvider.DirectHandleProvider<>(inputHandle, null)); + } + /** * Makes one call to an endpoint that doesn't take endpoint constants, endpoint state, or a session. * @param input the request data sent to the endpoint diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/InputOutputCaller.java b/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/InputOutputCaller.java index 1101cded9..54f26ba01 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/InputOutputCaller.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/InputOutputCaller.java @@ -50,6 +50,28 @@ static InputOutputCaller on( return new InputOutputEndpointImpl<>(client, apiDecl, new HandleProvider.ContentHandleProvider<>(inputHandle, outputHandle)); } + /** + * Constructs an instance of the InputOutputCaller interface. + * This factory is useful primarily for parameters or return values of the anyDocument type. + * @param client the database client to use for making calls + * @param apiDecl the JSON api declaration specifying how to call the endpoint + * @param inputHandle the handles that provides the input content (such as StringHandle) + * @param outputHandle the handles that provides the output content (such as BytesHandle) + * @param the content type of the input handle + * @param the type for the data received by the input handle + * @param the content type of the output handle + * @param the type for the data received by the output handle + * @param the input handle + * @param the output handle + * @return the InputOutputCaller instance for calling the endpoint. + */ + static ,O extends BufferableContentHandle> InputOutputCaller onHandles( + DatabaseClient client, JSONWriteHandle apiDecl, + I inputHandle, O outputHandle + ) { + return new InputOutputEndpointImpl(client, apiDecl, (HandleProvider) new HandleProvider.DirectHandleProvider(inputHandle, outputHandle)); + } + /** * Makes one call to an endpoint that doesn't take endpoint constants, endpoint state, or a session. * @param input the request data sent to the endpoint diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/OutputCaller.java b/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/OutputCaller.java index cdf501e36..8abb11d47 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/OutputCaller.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/OutputCaller.java @@ -44,6 +44,23 @@ static OutputCaller on( return new OutputEndpointImpl(client, apiDecl, new HandleProvider.ContentHandleProvider<>(null, outputHandle)); } + /** + * Constructs an instance of the OutputCaller interface. + * This factory is useful primarily for parameters or return values of the anyDocument type. + * @param client the database client to use for making calls + * @param apiDecl the JSON api declaration specifying how to call the endpoint + * @param outputHandle the handles that provides the output content (such as BytesHandle) + * @param the content type of the output handle + * @param the type for the data received by the output handle + * @param the output handle + * @return the InputOutputCaller instance for calling the endpoint. + */ + static > OutputCaller onHandles( + DatabaseClient client, JSONWriteHandle apiDecl, O outputHandle + ) { + return new OutputEndpointImpl(client, apiDecl, new HandleProvider.DirectHandleProvider<>(null, outputHandle)); + } + /** * Makes one call to an endpoint that doesn't take endpoint constants, endpoint state, or a session. * @return the response data from the endpoint diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/BaseCallerImpl.java b/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/BaseCallerImpl.java index a736f6fa6..957f6af13 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/BaseCallerImpl.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/BaseCallerImpl.java @@ -54,7 +54,7 @@ static abstract class ValuedefImpl { if (!"session".equals(datatype)) { this.format = Format.getFromDataType(datatype); - if (this.format == Format.UNKNOWN) { + if (this.format == Format.UNKNOWN && !"anyDocument".equals(datatype)) { throw new IllegalArgumentException( "datatype must specify a document format: " + datatype ); diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/IOCallerImpl.java b/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/IOCallerImpl.java index 41e434fc9..67fa889d3 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/IOCallerImpl.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/dataservices/impl/IOCallerImpl.java @@ -164,7 +164,7 @@ abstract class IOCallerImpl extends BaseCallerImpl { throw new IllegalArgumentException( "endpointState parameter requires return in endpoint: "+getEndpointPath() ); - } else if (this.endpointStateParamdef.getFormat() != this.returndef.getFormat()) { + } else if (this.endpointStateParamdef.getFormat() != this.returndef.getFormat() && !"anyDocument".equals(this.returndef.getDataType())) { throw new IllegalArgumentException( "endpointState format must match return format in endpoint: "+getEndpointPath() ); diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/test/dataservices/BulkIOEndpointTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/test/dataservices/BulkIOEndpointTest.java index 3ebc3f320..6b3953f45 100644 --- a/marklogic-client-api/src/test/java/com/marklogic/client/test/dataservices/BulkIOEndpointTest.java +++ b/marklogic-client-api/src/test/java/com/marklogic/client/test/dataservices/BulkIOEndpointTest.java @@ -23,8 +23,10 @@ import com.marklogic.client.dataservices.InputOutputCaller; import com.marklogic.client.document.JSONDocumentManager; import com.marklogic.client.impl.NodeConverter; +import com.marklogic.client.io.Format; import com.marklogic.client.io.InputStreamHandle; import com.marklogic.client.io.JacksonHandle; +import com.marklogic.client.io.StringHandle; import org.junit.Test; import java.io.IOException; @@ -158,6 +160,77 @@ public void testInputOutputCallerImpl() throws IOException { IOTestUtil.modMgr.delete(scriptPath, apiPath); } + @Test + public void testInputOutputCallerImplAnyDoc() throws IOException { + String apiName = "bulkIOAnyDocumentInputOutputCaller.api"; + + int nextStart = 1; + int workMax = 4; + + ObjectNode apiObj = IOTestUtil.readApi(apiName); + String scriptPath = IOTestUtil.getScriptPath(apiObj); + String apiPath = IOTestUtil.getApiPath(scriptPath); + IOTestUtil.load(apiName, apiObj, scriptPath, apiPath); + + int batchSize = apiObj.get("$bulk").get("inputBatchSize").asInt(); + int callCount = (workMax - nextStart) / batchSize + + (((workMax - nextStart) % batchSize) > 0 ? 1 : 0); + + String endpointState = "{\"next\":"+nextStart+"}"; + String endpointConstants = "{\"max\":"+workMax+"}"; + Set input = IOTestUtil.setOf( // Set.of( + "{\"docNum\":1, \"docName\":\"alpha\"}", + "{\"docNum\":2, \"docName\":\"beta\"}", + "{\"docNum\":3, \"docName\":\"gamma\"}" + ); + + Set output = new HashSet<>(); + InputOutputCaller endpoint = + InputOutputCaller.onHandles(IOTestUtil.db, new JacksonHandle(apiObj), new StringHandle(), new StringHandle()); + + InputOutputCaller.BulkInputOutputCaller bulkCaller = + endpoint.bulkCaller(endpoint.newCallContext() + .withEndpointStateAs(endpointState) + .withEndpointConstantsAs(endpointConstants)); + bulkCaller.setOutputListener(value -> { + String v = value.toString(); + System.out.println("received: "+v); + output.add(v); + }); + + input.stream().forEach(value -> { + System.out.println("adding "+value); + bulkCaller.accept(new StringHandle(value).withFormat(Format.JSON)); + }); + + bulkCaller.awaitCompletion(); + + ObjectNode finalState = mapper.readValue(bulkCaller.getEndpointState(), ObjectNode.class); + + assertEquals("mismatch between input and output size", input.size(), output.size()); + assertEquals("mismatch between input and output elements", input, output); + + assertNotNull("null final state", finalState); + assertTrue("final state not object", finalState.isObject()); + + JsonNode finalNext = finalState.get("next"); + assertNotNull("null final next", finalNext); + assertTrue("final next not number", finalNext.isNumber()); + assertEquals("mismatch on final next", workMax, finalNext.asInt()); + + JsonNode finalMax = finalState.get("workMax"); + assertNotNull("null final max", finalMax); + assertTrue("final max not number", finalMax.isNumber()); + assertEquals("mismatch on final max", workMax, finalMax.asInt()); + +// JsonNode sessionCounter = finalState.get("sessionCounter"); +// assertNotNull("null final sessionCounter", sessionCounter); +// assertTrue("final sessionCounter not number", sessionCounter.isNumber()); +// assertEquals("mismatch on final sessionCounter", callCount - 1, sessionCounter.asInt()); + + IOTestUtil.modMgr.delete(scriptPath, apiPath); + } + @Test public void testInputOutputCallerImplWithMultipleCallContexts() throws IOException { String apiName = "bulkIOInputOutputCaller.api"; diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/test/dataservices/BulkIOInputCallerTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/test/dataservices/BulkIOInputCallerTest.java index 53fd4aa26..ef3f33a05 100644 --- a/marklogic-client-api/src/test/java/com/marklogic/client/test/dataservices/BulkIOInputCallerTest.java +++ b/marklogic-client-api/src/test/java/com/marklogic/client/test/dataservices/BulkIOInputCallerTest.java @@ -20,8 +20,11 @@ import com.marklogic.client.dataservices.IOEndpoint; import com.marklogic.client.dataservices.InputCaller; import com.marklogic.client.document.JSONDocumentManager; +import com.marklogic.client.impl.NodeConverter; +import com.marklogic.client.io.Format; import com.marklogic.client.io.InputStreamHandle; import com.marklogic.client.io.JacksonHandle; +import com.marklogic.client.io.StringHandle; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -29,14 +32,15 @@ import java.io.InputStream; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.stream.Stream; import static org.junit.Assert.*; public class BulkIOInputCallerTest { - static ObjectNode apiObj; - static String apiName = "bulkIOInputCaller.api"; + static ObjectNode[] apiObj = new ObjectNode[2]; + static String[] apiNames = new String[]{"bulkIOInputCaller.api", "bulkIOAnyDocumentInputCaller.api"}; static String scriptPath; static String apiPath; static JSONDocumentManager docMgr; @@ -46,10 +50,13 @@ public class BulkIOInputCallerTest { @BeforeClass public static void setup() throws Exception { docMgr = IOTestUtil.db.newJSONDocumentManager(); - apiObj = IOTestUtil.readApi(apiName); - scriptPath = IOTestUtil.getScriptPath(apiObj); - apiPath = IOTestUtil.getApiPath(scriptPath); - IOTestUtil.load(apiName, apiObj, scriptPath, apiPath); + for (int i = 0; i < apiNames.length; i++) { + String apiName = apiNames[i]; + apiObj[i] = IOTestUtil.readApi(apiName); + scriptPath = IOTestUtil.getScriptPath(apiObj[i]); + apiPath = IOTestUtil.getApiPath(scriptPath); + IOTestUtil.load(apiName, apiObj[i], scriptPath, apiPath); + } map = new HashMap<>(); } @Test @@ -61,7 +68,7 @@ public void bulkInputEndpointTestWithMultipleCallContexts() { String endpointState1 = "{\"next\":"+1+"}"; String endpointConstants1 = "{\"max\":6,\"collection\":\"bulkInputTest_2\"}"; - InputCaller loadEndpt = InputCaller.on(IOTestUtil.db, new JacksonHandle(apiObj), new InputStreamHandle()); + InputCaller loadEndpt = InputCaller.on(IOTestUtil.db, new JacksonHandle(apiObj[0]), new InputStreamHandle()); IOEndpoint.CallContext[] callContextArray = {loadEndpt.newCallContext() .withEndpointStateAs(endpointState) .withEndpointConstantsAs(endpointConstants), loadEndpt.newCallContext() @@ -90,6 +97,48 @@ public void bulkInputEndpointTestWithMultipleCallContexts() { map.get("bulkInputTest_2") >= 1); } + @Test + public void bulkInputEndpointTestWithAnyDocument() { + + String endpointState = "{\"next\":"+1+"}"; + String endpointConstants = "{\"max\":6,\"collection\":\"bulkInputTest_1\"}"; + + String endpointState1 = "{\"next\":"+1+"}"; + String endpointConstants1 = "{\"max\":6,\"collection\":\"bulkInputTest_2\"}"; + + InputCaller loadEndpt = InputCaller.onHandles(IOTestUtil.db, new JacksonHandle(apiObj[1]), new StringHandle()); + IOEndpoint.CallContext[] callContextArray = {loadEndpt.newCallContext() + .withEndpointStateAs(endpointState) + .withEndpointConstantsAs(endpointConstants), loadEndpt.newCallContext() + .withEndpointStateAs(endpointState1) + .withEndpointConstantsAs(endpointConstants1)}; + InputCaller.BulkInputCaller loader = loadEndpt.bulkCaller(callContextArray); + + Set input = IOTestUtil.setOf( // Set.of( + "{\"docNum\":1, \"docName\":\"doc1\"}", + "{\"docNum\":2, \"docName\":\"doc2\"}", + "{\"docNum\":3, \"docName\":\"doc3\"}", + "{\"docNum\":4, \"docName\":\"doc4\"}", + "{\"docNum\":5, \"docName\":\"doc5\"}", + "{\"docNum\":6, \"docName\":\"doc6\"}", + "{\"docNum\":7, \"docName\":\"doc7\"}", + "{\"docNum\":8, \"docName\":\"doc8\"}" + ); + + input.stream().forEach(value -> { + //System.out.println("adding "+value); + loader.accept(new StringHandle(value).withFormat(Format.JSON)); + }); + loader.awaitCompletion(); + checkDocuments("bulkInputTest_1"); + checkDocuments("bulkInputTest_2"); + assertTrue("Number of documents written not as expected.", counter == 4); + assertTrue("No documents written by first callContext in - bulkInputTest_1 collection.", + map.get("bulkInputTest_1") >= 1); + assertTrue("No documents written by second callContext in - bulkInputTest_2 collection.", + map.get("bulkInputTest_2") >= 1); + } + @AfterClass public static void cleanup(){ IOTestUtil.modMgr.delete(scriptPath, apiPath); diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/test/dataservices/BulkIOOutputCallerTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/test/dataservices/BulkIOOutputCallerTest.java index 508716af3..944a8178a 100644 --- a/marklogic-client-api/src/test/java/com/marklogic/client/test/dataservices/BulkIOOutputCallerTest.java +++ b/marklogic-client-api/src/test/java/com/marklogic/client/test/dataservices/BulkIOOutputCallerTest.java @@ -26,8 +26,8 @@ import static org.junit.Assert.assertTrue; public class BulkIOOutputCallerTest { - static ObjectNode apiObj; - static String apiName = "bulkIOOutputCaller.api"; + static ObjectNode[] apiObj = new ObjectNode[2]; + static String[] apiNames = new String[]{"bulkIOOutputCaller.api", "bulkIOAnyDocumentOutputCaller.api"}; static String scriptPath; static String apiPath; static JSONDocumentManager docMgr; @@ -42,10 +42,13 @@ public class BulkIOOutputCallerTest { @BeforeClass public static void setup() throws Exception { docMgr = IOTestUtil.db.newJSONDocumentManager(); - apiObj = IOTestUtil.readApi(apiName); - scriptPath = IOTestUtil.getScriptPath(apiObj); - apiPath = IOTestUtil.getApiPath(scriptPath); - IOTestUtil.load(apiName, apiObj, scriptPath, apiPath); + for (int i = 0; i < apiNames.length; i++) { + String apiName = apiNames[i]; + apiObj[i] = IOTestUtil.readApi(apiName); + scriptPath = IOTestUtil.getScriptPath(apiObj[i]); + apiPath = IOTestUtil.getApiPath(scriptPath); + IOTestUtil.load(apiName, apiObj[i], scriptPath, apiPath); + } writeDocuments(10,20, collectionName_1); writeDocuments(30,40, collectionName_2); } @@ -58,7 +61,7 @@ public void bulkOutputCallerTestWithMultipleCallContexts() { String endpointState2 = "{\"next\":"+1+"}"; String endpointConstants2 = "{\"max\":6,\"limit\":5,\"collection\":\"bulkOutputTest_2\"}"; - OutputCaller endpoint = OutputCaller.on(IOTestUtil.db, new JacksonHandle(apiObj), new InputStreamHandle()); + OutputCaller endpoint = OutputCaller.on(IOTestUtil.db, new JacksonHandle(apiObj[0]), new InputStreamHandle()); IOEndpoint.CallContext[] callContextArray = {endpoint.newCallContext() .withEndpointStateAs(endpointState2) .withEndpointConstantsAs(endpointConstants2), endpoint.newCallContext() @@ -89,6 +92,44 @@ public void bulkOutputCallerTestWithMultipleCallContexts() { assertEquals("unexpected output values", expected, actual); } + @Test + public void bulkOutputCallerTestWithAnyDocuments() { + + String endpointState1 = "{\"next\":"+1+"}"; + String endpointConstants1 = "{\"max\":6,\"limit\":5,\"collection\":\"bulkOutputTest_1\"}"; + + String endpointState2 = "{\"next\":"+1+"}"; + String endpointConstants2 = "{\"max\":6,\"limit\":5,\"collection\":\"bulkOutputTest_2\"}"; + OutputCaller endpoint = OutputCaller.onHandles(IOTestUtil.db, new JacksonHandle(apiObj[1]), new StringHandle()); + IOEndpoint.CallContext[] callContextArray = {endpoint.newCallContext() + .withEndpointStateAs(endpointState2) + .withEndpointConstantsAs(endpointConstants2), endpoint.newCallContext() + .withEndpointStateAs(endpointState1) + .withEndpointConstantsAs(endpointConstants1)}; + OutputCaller.BulkOutputCaller bulkCaller = endpoint.bulkCaller(callContextArray); + Set actual = new ConcurrentSkipListSet<>(); + final AtomicBoolean duplicated = new AtomicBoolean(false); + final AtomicBoolean exceptional = new AtomicBoolean(false); + bulkCaller.setOutputListener(output -> { + try { + String serialized = output.toString(); + if (actual.contains(serialized)) { + duplicated.compareAndSet(false, true); + } else { + actual.add(serialized); + } + } catch (Exception e) { + e.printStackTrace(); + exceptional.compareAndSet(false, true); + } + }); + + bulkCaller.awaitCompletion(); + assertEquals("exceptions on calls", false, exceptional.get()); + assertEquals("duplicate output", false, duplicated.get()); + assertEquals("unexpected output count", expected.size(), actual.size()); + } + @AfterClass public static void cleanup() { diff --git a/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputCaller.api b/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputCaller.api new file mode 100644 index 000000000..424e685a0 --- /dev/null +++ b/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputCaller.api @@ -0,0 +1,32 @@ +{ + "endpoint": "/marklogic/ds/test/bulkIOAnyDocumentInputCaller.sjs", + "params": [ { + "name": "endpointState", + "datatype": "jsonDocument", + "multiple": false, + "nullable": true + }, { + "name": "session", + "datatype": "session", + "multiple": false, + "nullable": true + }, { + "name": "endpointConstants", + "datatype": "jsonDocument", + "multiple": false, + "nullable": true + }, { + "name": "input", + "datatype": "anyDocument", + "multiple": true, + "nullable": true + } ], + "return": { + "datatype": "anyDocument", + "multiple": false, + "nullable": true + }, + "$bulk":{ + "inputBatchSize":2 + } +} \ No newline at end of file diff --git a/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputCaller.sjs b/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputCaller.sjs new file mode 100644 index 000000000..1194a6234 --- /dev/null +++ b/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputCaller.sjs @@ -0,0 +1,27 @@ +'use strict'; +var endpointState; // jsonDocument? +var endpointConstants; // jsonDocument? +var input; // jsonDocument* +declareUpdate(); + +const state = fn.head(xdmp.fromJSON(endpointState)); +state.next = state.next + 1; + +const work = fn.head(xdmp.fromJSON(endpointConstants)); + +const inputs = + (input instanceof Sequence) ? input.toArray().map(item => fn.head(xdmp.fromJSON(item))) : + (input instanceof Document) ? [fn.head(xdmp.fromJSON(input))] : + [ {UNKNOWN: input} ]; + +xdmp.documentInsert( + '/marklogic/ds/test/bulkInputCaller/'+work.collection+'/'+state.next+'.json', + {state:state, work:work, inputs:inputs}, + {permissions:[ + xdmp.permission('rest-reader', 'read'), + xdmp.permission('rest-writer', 'update') + ]}) + +const returnValue = (state.next < work.max && fn.count(input) > 0) ? state : null; + +returnValue; diff --git a/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputOutputCaller.api b/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputOutputCaller.api new file mode 100644 index 000000000..3dfbb1904 --- /dev/null +++ b/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputOutputCaller.api @@ -0,0 +1,32 @@ +{ + "endpoint": "/marklogic/ds/test/bulkIOAnyDocumentInputOutputCaller.sjs", + "params": [ { + "name": "endpointState", + "datatype": "jsonDocument", + "multiple": false, + "nullable": true + }, { + "name": "session", + "datatype": "session", + "multiple": false, + "nullable": true + }, { + "name": "endpointConstants", + "datatype": "jsonDocument", + "multiple": false, + "nullable": true + }, { + "name": "input", + "datatype": "anyDocument", + "multiple": true, + "nullable": true + } ], + "return": { + "datatype": "anyDocument", + "multiple": true, + "nullable": true + }, + "$bulk":{ + "inputBatchSize":2 + } +} \ No newline at end of file diff --git a/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputOutputCaller.sjs b/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputOutputCaller.sjs new file mode 100644 index 000000000..382cd0be5 --- /dev/null +++ b/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputOutputCaller.sjs @@ -0,0 +1,21 @@ +'use strict'; +var endpointState; // jsonDocument? +var endpointConstants; // jsonDocument? +var input; // jsonDocument* + +const inputCount = fn.count(input); + +const work = fn.head(xdmp.fromJSON(endpointConstants)); + +const state = fn.head(xdmp.fromJSON(endpointState)); +state.next = state.next + inputCount; + +state.workMax = work.max; + +const inputs = + (input instanceof Sequence) ? input.toArray().map(item => fn.head(xdmp.fromJSON(item))) : + (input instanceof Document) ? [fn.head(xdmp.fromJSON(input))] : + [ {UNKNOWN: input} ]; + +const returnValue = (inputCount > 0) ? Sequence.from([state].concat(inputs)) : null; +returnValue; diff --git a/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentOutputCaller.api b/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentOutputCaller.api new file mode 100644 index 000000000..6a45f25b7 --- /dev/null +++ b/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentOutputCaller.api @@ -0,0 +1,24 @@ +{ + "endpoint": "/marklogic/ds/test/bulkIOAnyDocumentOutputCaller.sjs", + "params": [ { + "name": "endpointState", + "datatype": "jsonDocument", + "multiple": false, + "nullable": true + }, { + "name": "session", + "datatype": "session", + "multiple": false, + "nullable": true + }, { + "name": "endpointConstants", + "datatype": "jsonDocument", + "multiple": false, + "nullable": true + } ], + "return": { + "datatype": "anyDocument", + "multiple": true, + "nullable": true + } +} \ No newline at end of file diff --git a/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentOutputCaller.sjs b/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentOutputCaller.sjs new file mode 100644 index 000000000..6f1a59c86 --- /dev/null +++ b/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentOutputCaller.sjs @@ -0,0 +1,19 @@ +'use strict'; + +const state = fn.head(xdmp.fromJSON(endpointState)); +const work = fn.head(xdmp.fromJSON(endpointConstants)); + +const res = []; +const d = fn.subsequence(fn.collection(work.collection), state.next, work.limit); + +state.next = state.next + work.limit; + +if(!fn.empty(d)) { +res.push(state); +for (const x of d) { +res.push(x); +}; +} + +const returnValue = Sequence.from(res); +returnValue; \ No newline at end of file From 324c9154021b6c2e0324c30e72b32d772fb8a6b6 Mon Sep 17 00:00:00 2001 From: lling Date: Fri, 24 Sep 2021 10:57:18 -0700 Subject: [PATCH 2/2] #1296 modify the unit test --- .../test/dataservices/BulkIOEndpointTest.java | 31 ++++++++++--------- .../bulkIOAnyDocumentInputOutputCaller.sjs | 6 +++- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/test/dataservices/BulkIOEndpointTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/test/dataservices/BulkIOEndpointTest.java index 6b3953f45..0cbead437 100644 --- a/marklogic-client-api/src/test/java/com/marklogic/client/test/dataservices/BulkIOEndpointTest.java +++ b/marklogic-client-api/src/test/java/com/marklogic/client/test/dataservices/BulkIOEndpointTest.java @@ -118,10 +118,11 @@ public void testInputOutputCallerImpl() throws IOException { InputOutputCaller.on(IOTestUtil.db, new JacksonHandle(apiObj), new InputStreamHandle(), new InputStreamHandle() ); + IOEndpoint.CallContext callContext = endpoint.newCallContext() + .withEndpointStateAs(endpointState) + .withEndpointConstantsAs(endpointConstants); InputOutputCaller.BulkInputOutputCaller bulkCaller = - endpoint.bulkCaller(endpoint.newCallContext() - .withEndpointStateAs(endpointState) - .withEndpointConstantsAs(endpointConstants)); + endpoint.bulkCaller(callContext); bulkCaller.setOutputListener(value -> { String v = NodeConverter.InputStreamToString(value); // System.out.println("received: "+v); @@ -134,7 +135,7 @@ public void testInputOutputCallerImpl() throws IOException { }); bulkCaller.awaitCompletion(); - ObjectNode finalState = mapper.readValue(bulkCaller.getEndpointState(), ObjectNode.class); + ObjectNode finalState = mapper.readValue(callContext.getEndpointState().get(), ObjectNode.class); assertEquals("mismatch between input and output size", input.size(), output.size()); assertEquals("mismatch between input and output elements", input, output); @@ -187,25 +188,25 @@ public void testInputOutputCallerImplAnyDoc() throws IOException { Set output = new HashSet<>(); InputOutputCaller endpoint = InputOutputCaller.onHandles(IOTestUtil.db, new JacksonHandle(apiObj), new StringHandle(), new StringHandle()); - + IOEndpoint.CallContext callContext = endpoint.newCallContext() + .withEndpointStateAs(endpointState) + .withEndpointConstantsAs(endpointConstants); InputOutputCaller.BulkInputOutputCaller bulkCaller = - endpoint.bulkCaller(endpoint.newCallContext() - .withEndpointStateAs(endpointState) - .withEndpointConstantsAs(endpointConstants)); + endpoint.bulkCaller(callContext); bulkCaller.setOutputListener(value -> { String v = value.toString(); - System.out.println("received: "+v); + //System.out.println("received: "+v); output.add(v); }); input.stream().forEach(value -> { - System.out.println("adding "+value); + //System.out.println("adding "+value); bulkCaller.accept(new StringHandle(value).withFormat(Format.JSON)); }); bulkCaller.awaitCompletion(); - ObjectNode finalState = mapper.readValue(bulkCaller.getEndpointState(), ObjectNode.class); + ObjectNode finalState = mapper.readValue(callContext.getEndpointState().get(), ObjectNode.class); assertEquals("mismatch between input and output size", input.size(), output.size()); assertEquals("mismatch between input and output elements", input, output); @@ -223,10 +224,10 @@ public void testInputOutputCallerImplAnyDoc() throws IOException { assertTrue("final max not number", finalMax.isNumber()); assertEquals("mismatch on final max", workMax, finalMax.asInt()); -// JsonNode sessionCounter = finalState.get("sessionCounter"); -// assertNotNull("null final sessionCounter", sessionCounter); -// assertTrue("final sessionCounter not number", sessionCounter.isNumber()); -// assertEquals("mismatch on final sessionCounter", callCount - 1, sessionCounter.asInt()); + JsonNode sessionCounter = finalState.get("sessionCounter"); + assertNotNull("null final sessionCounter", sessionCounter); + assertTrue("final sessionCounter not number", sessionCounter.isNumber()); + assertEquals("mismatch on final sessionCounter", callCount - 1, sessionCounter.asInt()); IOTestUtil.modMgr.delete(scriptPath, apiPath); } diff --git a/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputOutputCaller.sjs b/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputOutputCaller.sjs index 382cd0be5..d47a20f7c 100644 --- a/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputOutputCaller.sjs +++ b/marklogic-client-api/src/test/resources/dataservices/bulkIOAnyDocumentInputOutputCaller.sjs @@ -5,13 +5,17 @@ var input; // jsonDocument* const inputCount = fn.count(input); +const callCounter = fn.head(xdmp.getSessionField('counter', 0)); +xdmp.setSessionField('counter', callCounter + 1); + const work = fn.head(xdmp.fromJSON(endpointConstants)); const state = fn.head(xdmp.fromJSON(endpointState)); state.next = state.next + inputCount; - +state.sessionCounter = callCounter; state.workMax = work.max; +// workaround for bug 53438 const inputs = (input instanceof Sequence) ? input.toArray().map(item => fn.head(xdmp.fromJSON(item))) : (input instanceof Document) ? [fn.head(xdmp.fromJSON(input))] :