1
+ /*
2
+ * Copyright (c) 2021 MarkLogic Corporation
3
+ *
4
+ * Licensed under the Apache License, Version 2.0 (the "License");
5
+ * you may not use this file except in compliance with the License.
6
+ * You may obtain a copy of the License at
7
+ *
8
+ * http://www.apache.org/licenses/LICENSE-2.0
9
+ *
10
+ * Unless required by applicable law or agreed to in writing, software
11
+ * distributed under the License is distributed on an "AS IS" BASIS,
12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ * See the License for the specific language governing permissions and
14
+ * limitations under the License.
15
+ */
1
16
package com .marklogic .client .functionaltest ;
2
17
3
18
import static org .junit .Assert .assertEquals ;
4
19
import static org .junit .Assert .assertTrue ;
5
20
6
- import java .io .ByteArrayInputStream ;
7
- import java .io .File ;
8
- import java .io .FileInputStream ;
9
- import java .io .FileReader ;
10
- import java .io .IOException ;
11
- import java .io .InputStream ;
21
+ import java .io .*;
12
22
import java .nio .charset .StandardCharsets ;
13
23
import java .util .Set ;
14
24
import java .util .TreeSet ;
@@ -73,6 +83,10 @@ public class BulkIOCallersFnTest extends BasicJavaClientREST {
73
83
private static String TextIngestConfigName = "DynamicIngestServicesForText" ;
74
84
private static String BinIngestConfigName = "DynamicIngestServicesForBin" ;
75
85
86
+ // AnyDocument endpoint Ingest and Egress Config Names
87
+ private static String AnyDocumentIngestConfigName = "DynamicIngestServicesAnyDocument" ;
88
+ private static String AnyDocumentEgressConfigName = "DynamicEgressServicesAnyDocument" ;
89
+
76
90
// Egress endpoint ConfigName
77
91
private static String JsonEgressConfigName = "DynamicEgressServicesForJson" ;
78
92
@@ -97,6 +111,9 @@ public class BulkIOCallersFnTest extends BasicJavaClientREST {
97
111
private static String IngestServicesTextURI = "/dynamic/fntest/DynamicIngestServices/text/" ;
98
112
private static String IngestServicesBinURI = "/dynamic/fntest/DynamicIngestServices/bin/" ;
99
113
private static String IngestServicesJsonErrorURI = "/dynamic/fntest/DynamicIngestServicesError/json/" ;
114
+ // Any Document URIs
115
+ private static String IngestServicesAnyDocumentURI = "/dynamic/fntest/DynamicIngestServices/any/" ;
116
+ private static String EgressServicesAnyDocumentURI = "/dynamic/fntest/DynamicEgressServices/any/" ;
100
117
//Output URI
101
118
private static String EgressServicesJsonURI = "/dynamic/fntest/DynamicEgressServices/json/" ;
102
119
private static String EgressServicesJsonErrorURI = "/dynamic/fntest/DynamicEgressServicesError/json/" ;
@@ -184,6 +201,30 @@ public static void setUp() throws Exception {
184
201
file = null ;
185
202
handle = null ;
186
203
204
+ file = new File (ApiConfigDirPath + AnyDocumentIngestConfigName + ".sjs" );
205
+ handle = new FileHandle (file );
206
+ docMgr .write (IngestServicesAnyDocumentURI + AnyDocumentIngestConfigName +".sjs" , metadataHandle , handle );
207
+ file = null ;
208
+ handle = null ;
209
+
210
+ file = new File (ApiConfigDirPath + AnyDocumentIngestConfigName + ".api" );
211
+ handle = new FileHandle (file );
212
+ docMgr .write (IngestServicesAnyDocumentURI + AnyDocumentIngestConfigName +".api" , metadataHandle , handle );
213
+ file = null ;
214
+ handle = null ;
215
+
216
+ file = new File (ApiConfigDirPath + AnyDocumentEgressConfigName + ".sjs" );
217
+ handle = new FileHandle (file );
218
+ docMgr .write (EgressServicesAnyDocumentURI + AnyDocumentEgressConfigName +".sjs" , metadataHandle , handle );
219
+ file = null ;
220
+ handle = null ;
221
+
222
+ file = new File (ApiConfigDirPath + AnyDocumentEgressConfigName + ".api" );
223
+ handle = new FileHandle (file );
224
+ docMgr .write ( EgressServicesAnyDocumentURI + AnyDocumentEgressConfigName +".api" , metadataHandle , handle );
225
+ file = null ;
226
+ handle = null ;
227
+
187
228
file = new File (ApiConfigDirPath + XmlIngestConfigName + ".sjs" );
188
229
handle = new FileHandle (file );
189
230
docMgr .write (IngestServicesXmlURI + XmlIngestConfigName +".sjs" , metadataHandle , handle );
@@ -1001,4 +1042,189 @@ public void TestIngestEgressOnJsonDocsError() throws Exception {
1001
1042
}
1002
1043
}
1003
1044
1045
+ /* Use /dynamic/fntest/DynamicIngestServicesAnyDocument/any/DynamicIngestServicesAnyDocument.sjs endpoint to test any documents ingestion
1046
+ SJS module groups documents in different collections on ingest.
1047
+ Test uses same egress endpoint with anyDocument data types to retrieve different doc types (json and xml)
1048
+ Was able to retrieve multiple doc types in a single call with all doc types being in one collection.
1049
+ We would need to inspect each retrieved doc content to know what doc type will be. Refer to readline used.
1050
+ */
1051
+ @ Test
1052
+ public void TestIngestEgressOnAnyDocument () throws Exception {
1053
+ System .out .println ("Running TestIngestEgressOnAnyDocument" );
1054
+ StringBuilder batchResultsJson = new StringBuilder ();
1055
+ StringBuilder batchResultsXml = new StringBuilder ();
1056
+ StringBuilder err = new StringBuilder ();
1057
+
1058
+ String binFileName = "Pandakarlino.jpg" ;
1059
+
1060
+ try {
1061
+ int startBatchIdx = 0 ;
1062
+ int maxDocSize = 10 ;
1063
+ StringBuilder retryBuf = new StringBuilder ();
1064
+
1065
+ ObjectMapper om = new ObjectMapper ();
1066
+ File apiFile = new File (ApiConfigDirPath + AnyDocumentIngestConfigName + ".api" );
1067
+
1068
+ JsonNode api = om .readTree (new FileReader (apiFile ));
1069
+ JacksonHandle jhAPI = new JacksonHandle (api );
1070
+
1071
+ String state = "{\" next\" :" +startBatchIdx +"}" ;
1072
+ String work = "{\" max\" :" +maxDocSize +"}" ;
1073
+
1074
+ InputCaller <InputStream > ingressEndpt = InputCaller .on (dbclient , jhAPI , new InputStreamHandle ());
1075
+ InputCaller .BulkInputCaller <InputStream > inputbulkCaller = ingressEndpt .bulkCaller (ingressEndpt .newCallContext ()
1076
+ .withEndpointConstantsAs (work .getBytes ())
1077
+ .withEndpointStateAs (state ));
1078
+
1079
+ InputCaller .BulkInputCaller .ErrorListener InerrorListener =
1080
+ (retryCount , throwable , callContext , inputHandles )
1081
+ -> {
1082
+ for (BufferableHandle h :inputHandles ) {
1083
+ retryBuf .append (h .toString ());
1084
+ }
1085
+ return IOEndpoint .BulkIOEndpointCaller .ErrorDisposition .RETRY ;
1086
+ };
1087
+
1088
+ File file1 = new File (DataConfigDirPath + "constraint1.json" );
1089
+ InputStream s1 = new FileInputStream (file1 );
1090
+ File file2 = new File (DataConfigDirPath + "constraint2.json" );
1091
+ InputStream s2 = new FileInputStream (file2 );
1092
+ File file3 = new File (DataConfigDirPath + "constraint3.json" );
1093
+ InputStream s3 = new FileInputStream (file3 );
1094
+ File file4 = new File (DataConfigDirPath + "constraint4.json" );
1095
+ InputStream s4 = new FileInputStream (file4 );
1096
+ File file5 = new File (DataConfigDirPath + "constraint5.json" );
1097
+ InputStream s5 = new FileInputStream (file5 );
1098
+ File file6 = new File (DataConfigDirPath + "cardinal1.xml" );
1099
+ InputStream s6 = new FileInputStream (file6 );
1100
+ File file7 = new File (DataConfigDirPath + "cardinal3.xml" );
1101
+ InputStream s7 = new FileInputStream (file7 );
1102
+
1103
+ FileInputStream s8 = new FileInputStream (DataConfigDirPath + binFileName );
1104
+
1105
+ String [] strContent = { "This is first test document" ,
1106
+ "This is second test document"
1107
+ };
1108
+ InputStream s9 = new ByteArrayInputStream (strContent [0 ].getBytes (StandardCharsets .UTF_8 ));
1109
+ InputStream s10 = new ByteArrayInputStream (strContent [1 ].getBytes (StandardCharsets .UTF_8 ));
1110
+
1111
+ Stream <InputStream > input = Stream .of (s1 , s2 , s3 , s4 , s5 , s6 , s7 , s8 , s9 , s10 );
1112
+ input .forEach (inputbulkCaller ::accept );
1113
+ inputbulkCaller .awaitCompletion ();
1114
+
1115
+ // Test Egress on Json docs and do the assert
1116
+ int batchStartIdx = 1 ;
1117
+ int retry = 1 ;
1118
+
1119
+ String collName = "AnyDocumentJSONCollection" ; // See Ingress module SJS doc insert()
1120
+ String returnIndex = "{\" returnIndex\" :" + batchStartIdx + "}" ;
1121
+ String workParamsJson = "{\" collectionName\" :\" " +collName +"\" , \" max\" :10}" ;
1122
+
1123
+ OutputCaller <InputStream > unloadEndpt = OutputCaller .on (dbclient , new FileHandle (new File (ApiConfigDirPath + AnyDocumentEgressConfigName + ".api" )), new InputStreamHandle ());
1124
+
1125
+ // Handle JSON doc egress using endpoint
1126
+ IOEndpoint .CallContext callContextArrayJson = unloadEndpt .newCallContext ()
1127
+ .withEndpointStateAs (returnIndex )
1128
+ .withEndpointConstantsAs (workParamsJson );
1129
+
1130
+ OutputCaller .BulkOutputCaller <InputStream > outputBulkCallerJson = unloadEndpt .bulkCaller (callContextArrayJson );
1131
+ OutputCaller .BulkOutputCaller .ErrorListener errorListenerJson =
1132
+ (retryCount , throwable , callContext )
1133
+ -> IOEndpoint .BulkIOEndpointCaller .ErrorDisposition .SKIP_CALL ;
1134
+
1135
+ outputBulkCallerJson .setOutputListener (record -> {
1136
+ try {
1137
+ //To determine what is in the stream, is it json, xml, txt or binary
1138
+ BufferedReader bufRdr = new BufferedReader (new InputStreamReader (record , StandardCharsets .UTF_8 ));
1139
+ String chkContent = bufRdr .readLine ();
1140
+ if (chkContent .startsWith ("{" ) || chkContent .startsWith ("[" )) {
1141
+ // JSON content
1142
+ System .out .println ("JSON Content start line is " + chkContent );
1143
+ batchResultsJson .append (chkContent );
1144
+ String line ;
1145
+ while ((line = bufRdr .readLine ()) != null ) {
1146
+ batchResultsJson .append (line );
1147
+ }
1148
+ }
1149
+ } catch (Exception ex ) {
1150
+ // Might be binary file stream
1151
+ System .out .println ("Exceptions from stream read back" + ex .getMessage ());
1152
+ }
1153
+ batchResultsJson .append ("|" );
1154
+ }
1155
+ );
1156
+ outputBulkCallerJson .setErrorListener (errorListenerJson );
1157
+ outputBulkCallerJson .awaitCompletion ();
1158
+
1159
+ // Handle XML doc egress using same endpoint
1160
+ collName = "AnyDocumentXMLCollection" ;
1161
+ String workParamsXml = "{\" collectionName\" :\" " +collName +"\" , \" max\" :10}" ;
1162
+ IOEndpoint .CallContext callContextArrayXml = unloadEndpt .newCallContext ()
1163
+ .withEndpointStateAs (returnIndex )
1164
+ .withEndpointConstantsAs (workParamsXml );
1165
+
1166
+ OutputCaller .BulkOutputCaller <InputStream > outputBulkCallerXml = unloadEndpt .bulkCaller (callContextArrayXml );
1167
+ OutputCaller .BulkOutputCaller .ErrorListener errorListenerXml =
1168
+ (retryCount , throwable , callContext )
1169
+ -> IOEndpoint .BulkIOEndpointCaller .ErrorDisposition .SKIP_CALL ;
1170
+
1171
+ outputBulkCallerXml .setOutputListener (record -> {
1172
+ try {
1173
+ //To determine what is in the stream, is it json, xml, txt or binary
1174
+ BufferedReader bufRdr = new BufferedReader (new InputStreamReader (record , StandardCharsets .UTF_8 ));
1175
+ String chkContent = bufRdr .readLine ();
1176
+ if (chkContent .startsWith ("<" )) {
1177
+ // XML content
1178
+ System .out .println ("XML Content start line is " + chkContent );
1179
+ batchResultsXml .append (chkContent );
1180
+ String line ;
1181
+ while ((line = bufRdr .readLine ()) != null ) {
1182
+ batchResultsXml .append (line );
1183
+ }
1184
+ }
1185
+ } catch (Exception ex ) {
1186
+ // Might be binary file stream
1187
+ System .out .println ("Exceptions from stream read back" + ex .getMessage ());
1188
+ }
1189
+ batchResultsXml .append ("|" );
1190
+ }
1191
+ );
1192
+ outputBulkCallerXml .setErrorListener (errorListenerXml );
1193
+ outputBulkCallerXml .awaitCompletion ();
1194
+ } catch (Exception e ) {
1195
+ e .printStackTrace ();
1196
+ err .append (e .getMessage ());
1197
+ }
1198
+ finally {
1199
+ String resJson = batchResultsJson .toString ();
1200
+ String resXml = batchResultsXml .toString ();
1201
+ // # of root elements should be 5.
1202
+ System .out .println ("Json Batch results from TestIngestEgressOnAnyDocument " + resJson );
1203
+ System .out .println ("Xml Batch results from TestIngestEgressOnAnyDocument " + resXml );
1204
+ // Verify using QueryManager
1205
+ QueryManager queryMgr = dbclient .newQueryManager ();
1206
+ StructuredQueryBuilder qb = new StructuredQueryBuilder ();
1207
+ StructuredQueryDefinition qd = qb .collection ("AnyDocumentJSONCollection" );
1208
+ // create handle
1209
+ JacksonHandle resultsHandle = new JacksonHandle ();
1210
+ queryMgr .search (qd , resultsHandle );
1211
+
1212
+ // get the result
1213
+ JsonNode resultDoc = resultsHandle .get ();
1214
+ int total = resultDoc .get ("total" ).asInt ();
1215
+ assertTrue ("No of Documents returned from egressed collection incorrect." , total == 5 );
1216
+
1217
+ assertTrue ("No of Json docs egressed incorrect. Expected 5." , (resJson .split ("\\ btitle\\ b" ).length -1 ) == 5 );
1218
+ assertTrue ("No of Json docs egressed incorrect. Expected only 1 wrote word." , (resJson .split ("\\ bwrote\\ b" ).length - 1 ) == 1 );
1219
+ assertTrue ("No of Json docs egressed incorrect. Expected only 1 described word." , (resJson .split ("\\ bdescribed\\ b" ).length - 1 ) == 1 );
1220
+ assertTrue ("No of Json docs egressed incorrect. Expected only 1 groundbreaking word." , (resJson .split ("\\ bgroundbreaking\\ b" ).length - 1 ) == 1 );
1221
+ assertTrue ("No of Json docs egressed incorrect. Expected only 1 intellectual word." , (resJson .split ("\\ bintellectual\\ b" ).length - 1 ) == 1 );
1222
+ assertTrue ("No of Json docs egressed incorrect. Expected only 1 unfortunately word." , (resJson .split ("\\ bunfortunately\\ b" ).length - 1 ) == 1 );
1223
+
1224
+ assertTrue ("Xml docs egressed incorrect." , resXml .contains ("baz" ));
1225
+ assertTrue ("Xml docs egressed incorrect." , resXml .contains ("three" ));
1226
+ assertTrue ("Unexpected Errors during egress. Should not have any errors." , err .toString ().isEmpty ());
1227
+ System .out .println ("End of TestIngestEgressOnAnyDocument" );
1228
+ }
1229
+ }
1004
1230
}
0 commit comments