Skip to content

Develop 1296 #1303

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jul 16, 2021
Merged
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 MarkLogic Corporation
* Copyright (c) 2021 MarkLogic Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@
package com.marklogic.client.dataservices;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.dataservices.impl.HandleProvider;
import com.marklogic.client.dataservices.impl.InputEndpointImpl;
import com.marklogic.client.io.marker.BufferableContentHandle;
import com.marklogic.client.io.marker.BufferableHandle;
Expand All @@ -37,7 +38,7 @@ public interface InputCaller<I> extends IOEndpoint {
* @return the InputCaller instance for calling the endpoint.
*/
static <I> InputCaller<I> on(DatabaseClient client, JSONWriteHandle apiDecl, BufferableContentHandle<I,?> inputHandle) {
return new InputEndpointImpl(client, apiDecl, inputHandle);
return new InputEndpointImpl(client, apiDecl, new HandleProvider.ContentHandleProvider<>(inputHandle,null));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 MarkLogic Corporation
* Copyright (c) 2021 MarkLogic Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.SessionState;
import com.marklogic.client.dataservices.impl.HandleProvider;
import com.marklogic.client.dataservices.impl.InputEndpointImpl;
import com.marklogic.client.io.InputStreamHandle;
import com.marklogic.client.io.marker.JSONWriteHandle;
Expand All @@ -39,7 +40,7 @@ static InputEndpoint on(DatabaseClient client, JSONWriteHandle apiDecl) {
final class EndpointLocal<O> extends InputEndpointImpl<InputStream,O>
implements InputEndpoint {
private EndpointLocal(DatabaseClient client, JSONWriteHandle apiDecl) {
super(client, apiDecl, new InputStreamHandle());
super(client, apiDecl, new HandleProvider.ContentHandleProvider<>(new InputStreamHandle(), null));
}
public InputEndpoint.BulkInputCaller bulkCaller() {
return new BulkLocal(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.marklogic.client.dataservices;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.dataservices.impl.HandleProvider;
import com.marklogic.client.dataservices.impl.InputOutputEndpointImpl;
import com.marklogic.client.io.marker.BufferableContentHandle;
import com.marklogic.client.io.marker.BufferableHandle;
Expand Down Expand Up @@ -44,9 +45,9 @@ public interface InputOutputCaller<I,O> extends IOEndpoint {
*/
static <I,O> InputOutputCaller<I,O> on(
DatabaseClient client, JSONWriteHandle apiDecl,
BufferableContentHandle<I,?> inputHandle, BufferableContentHandle<I,?> outputHandle
BufferableContentHandle<I,?> inputHandle, BufferableContentHandle<O,?> outputHandle
) {
return new InputOutputEndpointImpl(client, apiDecl, inputHandle, outputHandle);
return new InputOutputEndpointImpl<>(client, apiDecl, new HandleProvider.ContentHandleProvider<>(inputHandle, outputHandle));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 MarkLogic Corporation
* Copyright (c) 2021 MarkLogic Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.SessionState;
import com.marklogic.client.dataservices.impl.HandleProvider;
import com.marklogic.client.dataservices.impl.InputOutputEndpointImpl;
import com.marklogic.client.io.InputStreamHandle;
import com.marklogic.client.io.marker.JSONWriteHandle;
Expand All @@ -40,7 +41,7 @@ static InputOutputEndpoint on(DatabaseClient client, JSONWriteHandle apiDecl) {
final class EndpointLocal extends InputOutputEndpointImpl<InputStream,InputStream>
implements InputOutputEndpoint {
private EndpointLocal(DatabaseClient client, JSONWriteHandle apiDecl) {
super(client, apiDecl, new InputStreamHandle(), new InputStreamHandle());
super(client, apiDecl, new HandleProvider.ContentHandleProvider<>(new InputStreamHandle(), new InputStreamHandle()));
}
public InputOutputEndpoint.BulkInputOutputCaller bulkCaller() {
return new BulkLocal(this);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 MarkLogic Corporation
* Copyright (c) 2021 MarkLogic Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@
package com.marklogic.client.dataservices;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.dataservices.impl.HandleProvider;
import com.marklogic.client.dataservices.impl.OutputEndpointImpl;
import com.marklogic.client.io.marker.BufferableContentHandle;
import com.marklogic.client.io.marker.JSONWriteHandle;
Expand All @@ -40,7 +41,7 @@ public interface OutputCaller<O> extends IOEndpoint {
static <O> OutputCaller<O> on(
DatabaseClient client, JSONWriteHandle apiDecl, BufferableContentHandle<O,?> outputHandle
) {
return new OutputEndpointImpl(client, apiDecl, outputHandle);
return new OutputEndpointImpl(client, apiDecl, new HandleProvider.ContentHandleProvider<>(null, outputHandle));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 MarkLogic Corporation
* Copyright (c) 2021 MarkLogic Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.SessionState;
import com.marklogic.client.dataservices.impl.HandleProvider;
import com.marklogic.client.dataservices.impl.OutputEndpointImpl;
import com.marklogic.client.io.InputStreamHandle;
import com.marklogic.client.io.marker.JSONWriteHandle;
Expand All @@ -39,7 +40,7 @@ static OutputEndpoint on(DatabaseClient client, JSONWriteHandle apiDecl) {
final class EndpointLocal<I> extends OutputEndpointImpl<I,InputStream>
implements OutputEndpoint {
private EndpointLocal(DatabaseClient client, JSONWriteHandle apiDecl) {
super(client, apiDecl, new InputStreamHandle());
super(client, apiDecl, new HandleProvider.ContentHandleProvider<>(null, new InputStreamHandle()));
}
public OutputEndpoint.BulkOutputCaller bulkCaller() {
return new BulkLocal(this);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 MarkLogic Corporation
* Copyright (c) 2021 MarkLogic Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,7 +20,7 @@

final public class ExecCallerImpl<I,O> extends IOCallerImpl<I,O> {
public ExecCallerImpl(JSONWriteHandle apiDeclaration) {
super(apiDeclaration, null, null);
super(apiDeclaration, new HandleProvider.ContentHandleProvider<>(null, null));

if (getInputParamdef() != null) {
throw new IllegalArgumentException("input parameter not supported in endpoint: "+ getEndpointPath());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 MarkLogic Corporation
* Copyright (c) 2021 MarkLogic Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -127,7 +127,7 @@ public void awaitCompletion() {
else if(getCallContextQueue() != null && !getCallContextQueue().isEmpty()){
try {
for (int i = 0; i < getThreadCount(); i++) {
BulkCallableImpl bulkCallableImpl = new BulkCallableImpl(this);
BulkCallableImpl<I,O> bulkCallableImpl = new BulkCallableImpl(this);
submitTask(bulkCallableImpl);
}
if(getCallerThreadPoolExecutor() != null)
Expand Down Expand Up @@ -235,8 +235,7 @@ private boolean processOutput(CallContextImpl<I,O> callContext){
}
}

// TODO: static private class BulkCallableImpl<I,O>
private class BulkCallableImpl implements Callable<Boolean> {
static private class BulkCallableImpl<I,O> implements Callable<Boolean> {
private final BulkExecCallerImpl<I,O> bulkExecCallerImpl;

BulkCallableImpl(BulkExecCallerImpl<I,O> bulkExecCallerImpl) {
Expand All @@ -251,11 +250,11 @@ public Boolean call() throws InterruptedException{

if(continueCalling) {
bulkExecCallerImpl.getCallContextQueue().put(callContext);
submitTask(this);
bulkExecCallerImpl.submitTask(this);
}
else {
if (aliveCallContextCount.decrementAndGet() == 0 && getCallerThreadPoolExecutor() != null) {
getCallerThreadPoolExecutor().shutdown();
if (bulkExecCallerImpl.aliveCallContextCount.decrementAndGet() == 0 && bulkExecCallerImpl.getCallerThreadPoolExecutor() != null) {
bulkExecCallerImpl.getCallerThreadPoolExecutor().shutdown();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright (c) 2021 MarkLogic Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.marklogic.client.dataservices.impl;

import com.marklogic.client.impl.RESTServices;
import com.marklogic.client.io.marker.BufferableContentHandle;

public interface HandleProvider<I, O> {
BufferableContentHandle<?, ?> getInputHandle();
BufferableContentHandle<?, ?> getOutputHandle();
I[] newInputArray(int length);
O[] newOutputArray(int length);
BufferableContentHandle<?, ?>[] bufferableInputHandleOn(I[] input);
O[] outputAsArray(CallContextImpl<I, O> callCtxt, RESTServices.MultipleCallResponse response);

class ContentHandleProvider<I,O> implements HandleProvider<I,O> {
private final BufferableContentHandle<I,?> inputHandle;
private final BufferableContentHandle<O,?> outputHandle;
public ContentHandleProvider(BufferableContentHandle<I,?> inputHandle, BufferableContentHandle<O,?> outputHandle) {
this.inputHandle = inputHandle;
this.outputHandle = outputHandle;
}
@Override
public BufferableContentHandle<I,?> getInputHandle() {
return inputHandle;
}
@Override
public BufferableContentHandle<O,?> getOutputHandle() {
return outputHandle;
}
@Override
public I[] newInputArray(int length) {
if (inputHandle == null) {
throw new IllegalStateException("No input handle provided");
}
return inputHandle.newArray(length);
}
@Override
public O[] newOutputArray(int length) {
if (outputHandle == null) {
throw new IllegalStateException("No output handle provided");
}
return outputHandle.newArray(length);
}
@Override
public BufferableContentHandle<?,?>[] bufferableInputHandleOn(I[] input) {
if (inputHandle == null) {
throw new IllegalStateException("No input handle provided");
}
return inputHandle.resendableHandleFor(input);
}
@Override
public O[] outputAsArray(CallContextImpl<I,O> callCtxt, RESTServices.MultipleCallResponse response) {
if (outputHandle == null) {
throw new IllegalStateException("No output handle provided");
}
return response.asArrayOfContent(
callCtxt.isLegacyContext() ? null : callCtxt.getEndpointState(), outputHandle
);
}
}

class DirectHandleProvider<IC,IR,OC,OR>
implements HandleProvider<BufferableContentHandle<IC,IR>, BufferableContentHandle<OC,OR>> {
private final BufferableContentHandle<IC,IR> inputHandle;
private final BufferableContentHandle<OC,OR> outputHandle;
public DirectHandleProvider(BufferableContentHandle<IC,IR> inputHandle, BufferableContentHandle<OC,OR> outputHandle) {
this.inputHandle = inputHandle;
this.outputHandle = outputHandle;
}
@Override
public BufferableContentHandle<IC,IR> getInputHandle() {
return inputHandle;
}
@Override
public BufferableContentHandle<OC,OR> getOutputHandle() {
return outputHandle;
}
@Override
public BufferableContentHandle<IC,IR>[] newInputArray(int length) {
if (inputHandle == null) {
throw new IllegalStateException("No input handle provided");
}
return inputHandle.newHandleArray(length);
}
@Override
public BufferableContentHandle<OC,OR>[] newOutputArray(int length) {
if (outputHandle == null) {
throw new IllegalStateException("No output handle provided");
}
return outputHandle.newHandleArray(length);
}
@Override
public BufferableContentHandle<IC,IR>[] bufferableInputHandleOn(BufferableContentHandle<IC,IR>[] input) {
return input;
}
@Override
public BufferableContentHandle<OC,OR>[] outputAsArray(
CallContextImpl<BufferableContentHandle<IC,IR>, BufferableContentHandle<OC,OR>> callCtxt,
RESTServices.MultipleCallResponse response) {
if (outputHandle == null) {
throw new IllegalStateException("No output handle provided");
}
return response.asArrayOfHandles(callCtxt.getEndpointState(), outputHandle);
}
}
}
Loading