Skip to content

Results missing when 2 forests failover #813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
srinathgit opened this issue Sep 13, 2017 · 5 comments
Closed

Results missing when 2 forests failover #813

srinathgit opened this issue Sep 13, 2017 · 5 comments

Comments

@srinathgit
Copy link

srinathgit commented Sep 13, 2017

In the following test, I have 3 node cluster (rh7v-intel64-90-java-stress-1/2/4.marklogic.com) with a forest on each of the hosts and forests on hosts rh7v-intel64-90-java-stress-2/4.marklogic.com configured to failover to rh7v-intel64-90-java-stress-1.marklogic.com. When Query batcher is executed, I stop rh7v-intel64-90-java-stress-4.marklogic.com , the forest QBFailover-3 fails over to rh7v-intel64-90-java-stress-1.marklogic.com. After some time, I stop rh7v-intel64-90-java-stress-2.marklogic.com but before node timeout (30 seconds by default) elapses, I start rh7v-intel64-90-java-stress-4.marklogic.com. After 30 seconds, the forest QBFailover-2 on rh7v-intel64-90-java-stress-2.marklogic.com also fails over to rh7v-intel64-90-java-stress-1.marklogic.com. In this scenario, the total URIs returned is less than expected. The log is attached

TEST-com.marklogic.client.datamovement.functionaltests.QBFailover.txt

@Test
	public void testStopTwoNodes() throws Exception{
		Assert.assertTrue(dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue()==0);
		try{
				
			final AtomicInteger success = new AtomicInteger(0);
			
			AtomicBoolean isNode3Running = new AtomicBoolean(true);
			AtomicBoolean isNode2Running = new AtomicBoolean(true);
								
			addDocs();
			Assert.assertTrue(dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue()==6000);
			
			QueryBatcher batcher = dmManager.newQueryBatcher(new StructuredQueryBuilder().collection("XmlTransform"))
					.withBatchSize(15).withThreadCount(10);
			
			QueryFailureListener[] qfl = batcher.getQueryFailureListeners();
			List<QueryFailureListener> batchListeners = Arrays.asList(qfl);
			
			batchListeners = new ArrayList<QueryFailureListener>(batchListeners);
			
			for (Iterator<QueryFailureListener> iter = batchListeners.listIterator(); iter.hasNext(); ) {
				QueryFailureListener objList = iter.next();
			    if (objList.toString().contains("com.marklogic.client.datamovement.HostAvailabilityListener")) {
			        iter.remove();
			    }
			}
			batchListeners.add( new HostAvailabilityListener(dmManager)
						.withSuspendTimeForHostUnavailable(Duration.ofSeconds(15))
						.withMinHosts(2));
			batcher.setQueryFailureListeners(batchListeners.toArray(new QueryFailureListener[batchListeners.size()]));
			  
						
			batcher.onUrisReady((batch)->{
				success.addAndGet(batch.getItems().length);
			})
			.onQueryFailure(queryException->  {	
				queryException.printStackTrace();
		       }
			); 
			ticket = dmManager.startJob(batcher);  
			while( ! batcher.isStopped() ){
				if (isNode3Running.get() && dmManager.getJobReport(ticket).getSuccessEventsCount() > 999 ){
					isNode3Running.set(false);
					serverStartStop(hostNames[hostNames.length -1], "stop");
				}
				if (isNode2Running.get() && dmManager.getJobReport(ticket).getSuccessEventsCount() > 2999 ){
					isNode2Running.set(false);
					serverStartStop(hostNames[hostNames.length -2], "stop");
					Thread.currentThread().sleep(5000L);
					serverStartStop(hostNames[hostNames.length -1], "start");
				}
			}
			batcher.awaitCompletion();
			
			dmManager.stopJob(ticket);
			System.out.println("Success "+ success.intValue());
			assertEquals("document count", 6000,success.intValue()); 		
		}
		catch(Exception e){
			e.printStackTrace();
		}
		
	}

private void addDocs(){
		WriteBatcher ihb2 =  dmManager.newWriteBatcher();
stringTriple = "<abc>xml</abc>";
		stringHandle = new StringHandle(stringTriple);
		stringHandle.setFormat(Format.XML);
		meta2 = new DocumentMetadataHandle().withCollections("XmlTransform");
		meta2.setFormat(Format.XML);
		ihb2.withBatchSize(27).withThreadCount(10);
		ihb2.onBatchSuccess(
				batch -> {	}
				)
		.onBatchFailure(
				(batch, throwable) -> {
					throwable.printStackTrace();
				});
	
		dmManager.startJob(ihb2);
		
		for (int j =0 ;j < 6000; j++){
			String uri ="/local/string-"+ j;
			ihb2.add(uri, meta2, stringHandle);
		}
	
		ihb2.flushAndWait();
	}
@vivekmuniyandi
Copy link
Contributor

When a host is being brought down or if it is in the process of shutting down, there might be scenarios where there is no response for a request. With Okhttp in place, we are getting an EOF exception indicating an end of stream on Connection. Hence we have to take that into account during DMSDK failover. We have added a new listener called NoResponseListener which is a subclass of HostAvailabilityListener and it would handle the empty responses from the server. The reason we create a new listener and not add the exception to the list of HostAvailabilityListener's exceptions is that we would retry automatically for listeners as well if we failed to apply a listener after a set of URIs have been retrieved from the server. This is undesirable in ApplyTransformListener where an empty response could mean anything - either for the batch the transform has been applied or the batch has not been processed yet. Since transforms need not be idempotent, we shouldn't do a blind retry. Hence we created the new listener and it would be registered by default to both QueryBatcher and WriteBatcher instances when they are created.

For idempotent listeners like DeleteListener and even for WriteBatcher, we can do a blind retry. For WriteBatcher, since the NoResponseListener is registered by default, the listener would retry and get it written on the server. For idempotent listeners, the retry of applying the listeners is done by initializing the RetryListener of NoResponseListener and adding it to the onFailure listeners, similar to what we did for the HostAvailabilityListener and these should be added in the overrided initializeListener(QueryBatcher).

For listeners like ApplyTransformListener, there are two cases -

  1. The transform is idempotent - This case can be handled similar to idempotent listeners described above.
  2. The transform is not idempotent - This case should be handled by the customer. The customer should create a BatchFailureListener which would check each batch in the request for which we got an empty response and check with the server if the transform has been already applied and retry the batch only when the transform has not been applied.

@jmakeig
Copy link
Contributor

jmakeig commented Sep 29, 2017

Is there a way for a user to flag her ApplyTransformListener as idempotent so the retry can be applied automatically or does the user have to know to retry in her failure handler? ApplyTransformListener.initializeListener(QueryBatcher queryBatcher) is documented as “This default method should be implemented by custom listeners that should be retried in case of failover.” It's not clear to me what the means, though. What should the implementation do?

@srinathgit
Copy link
Author

srinathgit commented Sep 29, 2017

If the transform is idempotent, the user has to know to retry in their failure handler.

ApplyTransformListener listener = new ApplyTransformListener().withTransform(transform)
				.withApplyResult(ApplyResult.REPLACE).onSuccess(batch -> {
					success.addAndGet(batch.getItems().length);
				}).onSkipped(batch -> {
					skipped.addAndGet(batch.getItems().length);
				});
QueryBatcher batcher = dmManager.newQueryBatcher(new StructuredQueryBuilder().collection("XmlTransform")).onUrisReady(listener).withBatchSize(10).withThreadCount(5);
NoResponseListener noResponseListener = NoResponseListener.getInstance(batcher);
if (noResponseListener != null) {
	BatchFailureListener<QueryBatch> retryListener = noResponseListener.initializeRetryListener(listener);
	if (retryListener != null) {
		listener.onFailure(retryListener);
	}
}

@srinathgit
Copy link
Author

Able to obtain all URIs after failover

@vivekmuniyandi
Copy link
Contributor

vivekmuniyandi commented Oct 6, 2017

It's not clear to me what the means, though. What should the implementation do?

@jmakeig A sample implementation has been implemented in the DeleteListener

 @Override
  public void initializeListener(QueryBatcher queryBatcher) {
    HostAvailabilityListener hostAvailabilityListener = HostAvailabilityListener.getInstance(queryBatcher);
    if ( hostAvailabilityListener != null ) {
      BatchFailureListener<QueryBatch> retryListener = hostAvailabilityListener.initializeRetryListener(this);
      if ( retryListener != null )  onFailure(retryListener);
    }
    NoResponseListener noResponseListener = NoResponseListener.getInstance(queryBatcher);
    if ( noResponseListener != null ) {
      BatchFailureListener<QueryBatch> noResponseRetryListener = noResponseListener.initializeRetryListener(this);
      if ( noResponseRetryListener != null )  onFailure(noResponseRetryListener);
    }
  }

For ApplyTransform alone, we have to do how Srinath has mentioned in #813 (comment) if the transform is idempotent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants