Skip to content

Commit 0736cc7

Browse files
llingllinggit
lling
authored andcommitted
Add currrent work of concurrency for DMSDK for suggestions ##1269
1 parent 0d7b450 commit 0736cc7

File tree

5 files changed

+449
-84
lines changed

5 files changed

+449
-84
lines changed

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/QueryBatcher.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,26 @@ public interface QueryBatcher extends Batcher {
279279
* @return this instance for method chaining
280280
*/
281281
@Override
282-
public QueryBatcher withBatchSize(int batchSize);
282+
// existing method with renamed parameter
283+
public QueryBatcher withBatchSize(int docBatchSize);
284+
285+
// new setter methods
286+
public QueryBatcher withBatchSize(int docBatchSize, int docToUriBatchRatio);
287+
288+
public QueryBatcher withBatchSize(int docBatchSize, int docToUriBatchRatio, int threadThrottleFactor);
289+
290+
// new getter methods
291+
public int getDocToUriBatchRatio();
292+
293+
public int getThreadThrottleFactor();
294+
295+
296+
// new constant methods
297+
public int getDefaultDocBatchSize();
298+
299+
public int getMaxUriBatchSize();
300+
301+
public int getMaxDocToUriBatchRatio();
283302

284303
/**
285304
* Sets the number of threads added to the internal thread pool for this

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/DataMovementManagerImpl.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,8 @@ private QueryBatcher newQueryBatcherImpl(SearchQueryDefinition query) {
134134
if (Long.compareUnsigned(getServerVersion(), Long.parseUnsignedLong("10000500")) >= 0) {
135135
DataMovementServices.QueryConfig queryConfig = service.initConfig("POST", query);
136136
queryBatcher = new QueryBatcherImpl(query, this, queryConfig.forestConfig,
137-
queryConfig.serializedCtsQuery, queryConfig.filtered);
137+
queryConfig.serializedCtsQuery, queryConfig.filtered,
138+
queryConfig.maxDocToUriBatchRatio, queryConfig.defaultDocBatchSize, queryConfig.maxUriBatchSize);
138139
} else {
139140
queryBatcher = new QueryBatcherImpl(query, this, getForestConfig());
140141
}

marklogic-client-api/src/main/java/com/marklogic/client/datamovement/impl/DataMovementServices.java

+25
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,28 @@ QueryConfig initConfig(String method, SearchQueryDefinition qdef) {
7171
queryConfig.filtered = filteredResult.asBoolean();
7272
logger.debug("initialized filtering to: {}", queryConfig.filtered.toString());
7373
}
74+
JsonNode maxDocToUriBatchRatio = result.get("maxDocToUriBatchRatio");
75+
if (maxDocToUriBatchRatio != null && maxDocToUriBatchRatio.isInt()) {
76+
queryConfig.maxDocToUriBatchRatio = maxDocToUriBatchRatio.asInt();
77+
logger.debug("initialized maxDocToUriBatchRatio to : {}", queryConfig.maxDocToUriBatchRatio);
78+
} else {
79+
queryConfig.maxDocToUriBatchRatio = -1;
80+
}
81+
JsonNode defaultDocBatchSize = result.get("defaultDocBatchSize");
82+
if (defaultDocBatchSize != null && defaultDocBatchSize.isInt()) {
83+
queryConfig.defaultDocBatchSize = defaultDocBatchSize.asInt();
84+
logger.debug("initialized defaultDocBatchSize to : {}", queryConfig.defaultDocBatchSize);
85+
} else {
86+
queryConfig.defaultDocBatchSize = -1;
87+
}
88+
JsonNode maxUriBatchSize = result.get("maxUriBatchSize");
89+
if (maxUriBatchSize != null && maxUriBatchSize.isInt()) {
90+
queryConfig.maxUriBatchSize = maxUriBatchSize.asInt();
91+
logger.debug("initialized maxUriBatchSize to : {}", queryConfig.maxUriBatchSize);
92+
} else {
93+
queryConfig.maxUriBatchSize = -1;
94+
}
95+
7496
} catch (JsonProcessingException e) {
7597
logger.error("failed to initialize query", e);
7698
}
@@ -177,5 +199,8 @@ static class QueryConfig {
177199
String serializedCtsQuery;
178200
ForestConfiguration forestConfig;
179201
Boolean filtered;
202+
int maxDocToUriBatchRatio;
203+
int defaultDocBatchSize;
204+
int maxUriBatchSize;
180205
}
181206
}

0 commit comments

Comments
 (0)