Skip to content

URIs missing during failover #814

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
srinathgit opened this issue Sep 13, 2017 · 5 comments
Closed

URIs missing during failover #814

srinathgit opened this issue Sep 13, 2017 · 5 comments

Comments

@srinathgit
Copy link

srinathgit commented Sep 13, 2017

In the following test, I have 3 node cluster (rh7v-intel64-90-java-stress-1/2/4.marklogic.com) with a forest on each of the hosts and forests on hosts rh7v-intel64-90-java-stress-2/4.marklogic.com configured to failover to rh7v-intel64-90-java-stress-1.marklogic.com. When Query batcher is executed, I stop and start rh7v-intel64-90-java-stress-4.marklogic.com multiple times (for a duration less than suspend time for batcher, greater than suspend time and then greater than node timeout making the forest QBFailover-3 fail over to rh7v-intel64-90-java-stress-1.marklogic.com. In this scenario, the total URIs returned is less than expected. The log is attached
TEST-com.marklogic.client.datamovement.functionaltests.QBFailover.txt

@Test
	public void testRepeatedStopOneNode() throws Exception{
		Assert.assertTrue(dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue()==0);
		addDocs();
		Assert.assertTrue(dbClient.newServerEval().xquery(query1).eval().next().getNumber().intValue()==6000);
		
		
		AtomicInteger success = new AtomicInteger(0);
		AtomicInteger failure = new AtomicInteger(0);
		AtomicBoolean isRunning = new AtomicBoolean(true);
		
		QueryBatcher batcher = dmManager.newQueryBatcher(new StructuredQueryBuilder().collection("XmlTransform"))
				.withBatchSize(50).withThreadCount(3);
		
		QueryFailureListener[] qfl = batcher.getQueryFailureListeners();
		List<QueryFailureListener> batchListeners = Arrays.asList(qfl);
		
		batchListeners = new ArrayList<QueryFailureListener>(batchListeners);
		
		for (Iterator<QueryFailureListener> iter = batchListeners.listIterator(); iter.hasNext(); ) {
			QueryFailureListener objList = iter.next();
		    if (objList.toString().contains("com.marklogic.client.datamovement.HostAvailabilityListener")) {
		        iter.remove();
		    }
		}
		batchListeners.add( new HostAvailabilityListener(dmManager)
					.withSuspendTimeForHostUnavailable(Duration.ofSeconds(15))
					.withMinHosts(2));
		batcher.setQueryFailureListeners(batchListeners.toArray(new QueryFailureListener[batchListeners.size()]));
			
		batcher.onUrisReady((batch)->{
			success.addAndGet(batch.getItems().length);
		}).onQueryFailure(queryException->  
			{
				queryException.printStackTrace();
	        }
		); 
			
		ticket = dmManager.startJob( batcher );
		while( ! batcher.isStopped() ){
			if (dmManager.getJobReport(ticket).getSuccessEventsCount() > 99 && isRunning.get()){
				isRunning.set(false);
				serverStartStop(hostNames[hostNames.length -1], "stop");
				Thread.currentThread().sleep(6000L);
				serverStartStop(hostNames[hostNames.length -1], "start");
				Thread.currentThread().sleep(6000L);
				serverStartStop(hostNames[hostNames.length -1], "stop");
				Thread.currentThread().sleep(18000L);
				serverStartStop(hostNames[hostNames.length -1], "start");
				Thread.currentThread().sleep(6000L);
				serverStartStop(hostNames[hostNames.length -1], "stop");
			}
		}
		batcher.awaitCompletion();
		dmManager.stopJob(ticket);
		System.out.println("Success "+ success.intValue());
		System.out.println("Failure "+failure.intValue());
		
		assertEquals("document count", 6000,success.intValue()); 
		assertEquals("document count", 0,failure.intValue()); 
	}
private void addDocs(){
		WriteBatcher ihb2 =  dmManager.newWriteBatcher();
stringTriple = "<abc>xml</abc>";
		stringHandle = new StringHandle(stringTriple);
		stringHandle.setFormat(Format.XML);
		meta2 = new DocumentMetadataHandle().withCollections("XmlTransform");
		meta2.setFormat(Format.XML);
		ihb2.withBatchSize(27).withThreadCount(10);
		ihb2.onBatchSuccess(
				batch -> {	}
				)
		.onBatchFailure(
				(batch, throwable) -> {
					throwable.printStackTrace();
				});
	
		dmManager.startJob(ihb2);
		
		for (int j =0 ;j < 6000; j++){
			String uri ="/local/string-"+ j;
			ihb2.add(uri, meta2, stringHandle);
		}
	
		ihb2.flushAndWait();
	}
@vivekmuniyandi
Copy link
Contributor

I am not reproducing this in my cluster. Both this and #813 have failed with the same exception.

java.io.IOException: unexpected end of stream on Connection{rh7v-intel64-90-java-stress-4.marklogic.com:8000, proxy=DIRECT hostAddress=rh7v-intel64-90-java-stress-4.marklogic.com/172.18.132.248:8000 cipherSuite=none protocol=http/1.1}
	at okhttp3.internal.http1.Http1Codec.readResponseHeaders(Http1Codec.java:205)
	at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.java:75)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:45)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
	at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
	at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:120)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
	at com.burgstaller.okhttp.AuthenticationCacheInterceptor.intercept(AuthenticationCacheInterceptor.java:45)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
	at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:185)
	at okhttp3.RealCall.execute(RealCall.java:69)
	at com.marklogic.client.impl.OkHttpServices.sendRequestOnce(OkHttpServices.java:674)
	... 13 more
Caused by: java.io.EOFException: \n not found: limit=0 content=…
	at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:227)
	at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.java:211)
	at okhttp3.internal.http1.Http1Codec.readResponseHeaders(Http1Codec.java:189)
	... 32 more

A similar error has been reported with Okhttp but it has been ruled out as a server issue. Will try it out on the stress cluster next week.

@vivekmuniyandi
Copy link
Contributor

Have filed square/okhttp#3629 against Okhttp for this issue to create a more generic exception. Meanwhile adding a custom listener similar to HostAvailabilityListener to handle the cases where there is no response from the server.

@srinathgit
Copy link
Author

After the fix, all the URIs are returned

@reshbu
Copy link

reshbu commented Aug 26, 2020

@vivekmuniyandi finally how did u solve it? am facing similar issue.. Not able to conclude whether its server issue or mobile okhttp connection issue.
okhttpexception.txt

@ehennum
Copy link
Contributor

ehennum commented Aug 26, 2020

@reshbu , are you running into a problem with the MarkLogic Java API? If so, please work with your support contact to provide a reproducible case.

If not, please raise OkHttp questions on the OkHttp repository or on a general resource such as Stack Overflow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants