Skip to content

Another "FtpSession - failed to disconnect FTPClient" #3168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jonforums opened this issue Feb 5, 2020 · 22 comments · Fixed by #3174
Closed

Another "FtpSession - failed to disconnect FTPClient" #3168

jonforums opened this issue Feb 5, 2020 · 22 comments · Fixed by #3174
Assignees
Labels
Milestone

Comments

@jonforums
Copy link

Affects Version(s): 5.2.3


Recently upgraded from 5.1.9 to 5.2.3 and now seeing FTP warnings that never happened with 5.1.9. My issue appears similar to #3123.

Relevant config is the following. Spring Integration code is interacting with a vsftpd server running on Ubuntu Server.

    @Bean
    @Profile("production")
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setClientMode(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE);
        sf.setHost(env.getProperty("ftp.host", "XX.XX.XX.XX"));
        sf.setPort(env.getProperty("ftp.port", Integer.class, 22));
        sf.setUsername(env.getProperty("ftp.username", "XXXX"));
        sf.setPassword(decrypter.decrypt(env.getProperty("ftp.password", "XXXX")));

       <-- NOTE: prod session cache size used is 4 with timeout of 1000 -->
        CachingSessionFactory<FTPFile> sessionCache =
            new CachingSessionFactory<>(sf, env.getProperty("ftp.session.cache", Integer.class, 8));
        sessionCache.setSessionWaitTimeout(env.getProperty("ftp.session.waitimeout", Long.class, 2000L));

        return sessionCache;
    }

    @Bean
    public Advice xmitAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload.delete()");

        advice.setOnFailureExpressionString("'transmit failure: ' + #exception.cause.message");
        advice.setFailureChannelName("inputErrorChannel");
        advice.setTrapException(true);

        return advice;
    }

    @Bean
    @Profile("production")
    @ServiceActivator(inputChannel = "workQueue", adviceChain = "xmitAdvice")
    public MessageHandler ftpHandler() {
        SpelExpressionParser parser = new SpelExpressionParser();
        Expression dirExpression =
            parser.parseExpression("headers[T(com.XXX.YYY.CustomHeaders).OUTPUT_DIR]");

        FtpMessageHandler ftp = new FtpMessageHandler(ftpSessionFactory());
        ftp.setRemoteDirectoryExpression(dirExpression);
        ftp.setFileNameGenerator(outputFileNameGenerator);

        return ftp;
    }

The following is the most common exception with multiple occurrences:

2020-02-05T09:53:41.558 [taskScheduler-12] WARN  org.springframework.integration.ftp.session.FtpSession - failed to disconnect FTPClient
java.net.SocketException: Broken pipe (Write failed)
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
	at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
	at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)
	at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295)
	at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141)
	at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229)
	at java.io.BufferedWriter.flush(BufferedWriter.java:254)
	at org.apache.commons.net.ftp.FTP.__send(FTP.java:545)
	at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:519)
	at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:648)
	at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:622)
	at org.apache.commons.net.ftp.FTP.quit(FTP.java:904)
	at org.apache.commons.net.ftp.FTPClient.logout(FTPClient.java:1148)
	at org.springframework.integration.ftp.session.FtpSession.close(FtpSession.java:159)
	at org.springframework.integration.file.remote.session.CachingSessionFactory$1.removedFromPool(CachingSessionFactory.java:93)
	at org.springframework.integration.file.remote.session.CachingSessionFactory$1.removedFromPool(CachingSessionFactory.java:80)
	at org.springframework.integration.util.SimplePool.doRemoveItem(SimplePool.java:256)
	at org.springframework.integration.util.SimplePool.doGetItem(SimplePool.java:202)
	at org.springframework.integration.util.SimplePool.getItem(SimplePool.java:173)
	at org.springframework.integration.file.remote.session.CachingSessionFactory.getSession(CachingSessionFactory.java:132)
	at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432)
	at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:298)
	at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:286)
	at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:278)
	at org.springframework.integration.file.remote.handler.FileTransferringMessageHandler.handleMessageInternal(FileTransferringMessageHandler.java:205)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177)
	at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:49)
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler$AdvisedRequestHandler.handleRequestMessage(AbstractReplyProducingMessageHandler.java:201)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$CallbackImpl.execute(AbstractRequestHandlerAdvice.java:151)
	at org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice.doInvoke(ExpressionEvaluatingRequestHandlerAdvice.java:237)
	at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:67)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
	at com.sun.proxy.$Proxy37.handleRequestMessage(Unknown Source)
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.doInvokeAdvisedRequestHandler(AbstractReplyProducingMessageHandler.java:146)
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:130)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177)
	at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:143)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:396)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:380)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:275)
	at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
	at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:272)
	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
	at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

However, there is one occurrence of this exception

2020-02-04T17:35:45.366 [taskScheduler-7] WARN  org.springframework.integration.ftp.session.FtpSession - failed to disconnect FTPClient
org.apache.commons.net.ftp.FTPConnectionClosedException: Connection closed without indication.
	at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:324)
	at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:300)
	at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:523)
	at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:648)
	at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:622)
	at org.apache.commons.net.ftp.FTP.quit(FTP.java:904)
	at org.apache.commons.net.ftp.FTPClient.logout(FTPClient.java:1148)
	at org.springframework.integration.ftp.session.FtpSession.close(FtpSession.java:159)
	at org.springframework.integration.file.remote.session.CachingSessionFactory$1.removedFromPool(CachingSessionFactory.java:93)
	at org.springframework.integration.file.remote.session.CachingSessionFactory$1.removedFromPool(CachingSessionFactory.java:80)
	at org.springframework.integration.util.SimplePool.doRemoveItem(SimplePool.java:256)
	at org.springframework.integration.util.SimplePool.doGetItem(SimplePool.java:202)
	at org.springframework.integration.util.SimplePool.getItem(SimplePool.java:173)
	at org.springframework.integration.file.remote.session.CachingSessionFactory.getSession(CachingSessionFactory.java:132)
	at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432)
	at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:298)
	at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:286)
	at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:278)
	at org.springframework.integration.file.remote.handler.FileTransferringMessageHandler.handleMessageInternal(FileTransferringMessageHandler.java:205)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177)
	at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:49)
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler$AdvisedRequestHandler.handleRequestMessage(AbstractReplyProducingMessageHandler.java:201)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$CallbackImpl.execute(AbstractRequestHandlerAdvice.java:151)
	at org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice.doInvoke(ExpressionEvaluatingRequestHandlerAdvice.java:237)
	at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:67)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
	at com.sun.proxy.$Proxy37.handleRequestMessage(Unknown Source)
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.doInvokeAdvisedRequestHandler(AbstractReplyProducingMessageHandler.java:146)
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:130)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177)
	at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:143)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:396)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:380)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:275)
	at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
	at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:272)
	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
	at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
@artembilan
Copy link
Member

This is 5.1.x branch code:

public void close() {
		try {
			if (this.readingRaw.get()) {
				if (!finalizeRaw()) {
					if (this.logger.isWarnEnabled()) {
						this.logger.warn("Finalize on readRaw() returned false for " + this);
					}
				}
			}
			this.client.logout();
			this.client.disconnect();
		}
		catch (Exception e) {
			if (this.logger.isWarnEnabled()) {
				this.logger.warn("failed to disconnect FTPClient", e);
			}
		}
	}

and this is 5.2.x:

public void close() {
		try {
			if (this.readingRaw.get() && !finalizeRaw() && LOGGER.isWarnEnabled()) {
				LOGGER.warn("Finalize on readRaw() returned false for " + this);
			}
			if (this.client.isConnected()) {
				this.client.logout();
			}
			this.client.disconnect();
		}
		catch (Exception e) {
			LOGGER.warn("failed to disconnect FTPClient", e);
		}
	}

So, technically there is no difference: that client.logout() has always been called. Therefore you have to see that WARN independently of the version.

On the other side it is just a WARN. We can move it to an INFO though since it doesn't look like a big problem when we close the session...

Therefore I don't see how the mentioned fix for #3123 may affect you.
It might be some other fix in other places, like SimplePool or CachingSessionFactory.

I mean you don't see that warn in the version 5.1.9 because that FtpSession.close() hasn't been called at all.

Consider just to ignore such a WARN message in your logs or let us know that it would be better to move it into an INFO!

I don't see anything wrong with the logic we have so far therefore not sure what is your idea about a bug.

Thank you for understanding!

@artembilan artembilan added status: waiting-for-reporter Needs a feedback from the reporter in: ftp labels Feb 5, 2020
@jonforums
Copy link
Author

I don't think changing logging levels is right since we don't yet know root cause.

Key issue is why have these new WARN messages started filling my logs after upgrading to 5.2.3 with no config changes on my end, i.e. is my SI config wrong, or is there a weakness in some SI code?

Best I can tell is that the new WARNs do not mean my FTP file transfers failed. But how do we verify that SimplePool or CachingSessionFactory don't have a lurking weakness (threading, race?) causing the new WARNs?

If you want me to set specific loggers to DEBUG or TRACE to help track things down, let me know which loggers to update.

@garyrussell
Copy link
Contributor

Interesting; looking at the Apache code, I don't see any side effects of calling isConnected() before calling logout(); it just looks at field values.

So the only side effect would be some tiny timing differences. So, I am not sure why you would see new exceptions there.

However, it does look like we should try/catch around the logout so we don't skip the disconnect().

@artembilan
Copy link
Member

Yes, please, TRACE for org.springframework.integration category would be enough.
And then it would be great to compare logs for those 5.1.9 & 5.2.3 versions.

The WARN we get is because there is no connection with the FTP server to logout.
In my feeling that we just haven't called FtpSession.close() previously...

@artembilan
Copy link
Member

so we don't skip the disconnect().

Yeah... Agree. But that is not the point. @jonforums just doesn't see a WARN in previous version.

@garyrussell
Copy link
Contributor

garyrussell commented Feb 5, 2020

not the point

I understand that; I just don't see a new path to getting these exceptions. Broken pipe when sending the QUIT implies the server reset the connection; makes no sense to me.

@jonforums
Copy link
Author

Will set TRACE on org.springframework.integration in the running prod 5.2.3 environment, and reply with a relevant log snippet.

Also reading 3rd para of https://commons.apache.org/proper/commons-net/apidocs/org/apache/commons/net/ftp/FTP.html talking about FTP servers closing idle client connections.

@garyrussell
Copy link
Contributor

garyrussell commented Feb 5, 2020

at org.springframework.integration.util.SimplePool.doRemoveItem(SimplePool.java:256)
at org.springframework.integration.util.SimplePool.doGetItem(SimplePool.java:202)

Interesting; in both cases, the close() is being performed because session.isOpen() returned false.

	@Override
	public boolean isOpen() {
		try {
			this.client.noop();
		}
		catch (Exception e) {
			return false;
		}
		return true;
	}

Seems odd that the noop failed but the client is still reporting as isConnected().

@jonforums
Copy link
Author

Is there a more specific logger I can set to TRACE since org.springframework.integration appears to capture every poll? It may be awhile before my customer sends in another file on their batch schedule.

2020-02-05T13:26:31.909 [taskScheduler-7] DEBUG org.springframework.integration.endpoint.PollingConsumer - Received no Message during the poll, returning 'false'

@artembilan
Copy link
Member

Try org.springframework.integration.ftp and org.springframework.integration.util then.

@jonforums
Copy link
Author

Interesting to see the time delta between the first successful batch of FTP sends, the second batch of failed FTP sends, and SimplePool trying to reuse a pool session object that turns out to be stale.

Maybe there's a code path in SimplePool's stale session removal logic that balks if the FTP server has already disconnected due to the server thinking the old FTP session was idle?

# summary of first good FTP access and send of (2) files, reusing pooled session FtpSession@5ad93db2
#
2020-02-05T14:56:16.103 [taskScheduler-6] DEBUG org.springframework.integration.ftp.outbound.FtpMessageHandler - bean 'ftpHandler'; defined in: 'com.XXXX.config.FtpConfig'; from source: 'org.springframework.core.type.StandardMethodMetadata@5ae50ce6' received message: GenericMessage [...]
2020-02-05T14:56:21.127 [taskScheduler-6] DEBUG org.springframework.integration.ftp.session.DefaultFtpSessionFactory - Connected to server [prod.XXXX:21]
2020-02-05T14:56:21.198 [taskScheduler-6] DEBUG org.springframework.integration.util.SimplePool - Obtained new org.springframework.integration.ftp.session.FtpSession@5ad93db2.
2020-02-05T14:56:21.203 [taskScheduler-6] INFO  org.springframework.integration.ftp.session.FtpSession - File has been successfully transferred to: XXXX/work/in/x12/foo-1-namemangled.writing
2020-02-05T14:56:21.204 [taskScheduler-6] INFO  org.springframework.integration.ftp.session.FtpSession - File has been successfully renamed from XXXX/work/in/x12/foo-1-namemangled.writing to XXXX/work/in/x12/foo-1-namemangled
2020-02-05T14:56:21.205 [taskScheduler-6] DEBUG org.springframework.integration.util.SimplePool - Releasing org.springframework.integration.ftp.session.FtpSession@5ad93db2 back to the pool
2020-02-05T14:56:21.209 [taskScheduler-6] DEBUG org.springframework.integration.ftp.outbound.FtpMessageHandler - bean 'ftpHandler'; defined in: 'com.XXXX.config.FtpConfig'; from source: 'org.springframework.core.type.StandardMethodMetadata@5ae50ce6' received message: GenericMessage [...]
2020-02-05T14:56:21.209 [taskScheduler-6] DEBUG org.springframework.integration.util.SimplePool - Obtained org.springframework.integration.ftp.session.FtpSession@5ad93db2 from pool.
2020-02-05T14:56:21.213 [taskScheduler-6] INFO  org.springframework.integration.ftp.session.FtpSession - File has been successfully transferred to: XXXX/work/in/x12/foo-2-namemangled.writing
2020-02-05T14:56:21.214 [taskScheduler-6] INFO  org.springframework.integration.ftp.session.FtpSession - File has been successfully renamed from XXXX/work/in/x12/foo-2-namemangled.writing to XXXX/work/in/x12/foo-2-namemangled
2020-02-05T14:56:21.214 [taskScheduler-6] DEBUG org.springframework.integration.util.SimplePool - Releasing org.springframework.integration.ftp.session.FtpSession@5ad93db2 back to the pool

In between the above successful FTP send of two files and the following FTP send failure, (2) good SFTP sends occurred using SimplePool and SftpSession instances.

# This set of attempts occur at 15:29:40 while the previous good FTP sessions
# occurred at 14:56:21. SimplePool discovers that FtpSession@5ad93db2 is
# stale, tries to remove the session object, but the object removal looks to fall
# down while trying to disconnect from the FTP server. Sadly, I can't verify
# whether the FTP server has already disconnected at the time of this exception.
#
2020-02-05T15:29:40.021 [taskScheduler-9] DEBUG org.springframework.integration.ftp.outbound.FtpMessageHandler - bean 'ftpHandler'; defined in: 'com.XXXX.config.FtpConfig'; from source: 'org.springframework.core.type.StandardMethodMetadata@5ae50ce6' received message: GenericMessage [...]
2020-02-05T15:29:40.067 [taskScheduler-9] DEBUG org.springframework.integration.util.SimplePool - Obtained org.springframework.integration.ftp.session.FtpSession@5ad93db2 from pool.
2020-02-05T15:29:40.069 [taskScheduler-9] DEBUG org.springframework.integration.util.SimplePool - Received a stale item org.springframework.integration.ftp.session.FtpSession@5ad93db2, will attempt to get a new one.
2020-02-05T15:29:40.069 [taskScheduler-9] DEBUG org.springframework.integration.util.SimplePool - Removing org.springframework.integration.ftp.session.FtpSession@5ad93db2 from the pool
2020-02-05T15:29:40.088 [taskScheduler-9] WARN  org.springframework.integration.ftp.session.FtpSession - failed to disconnect FTPClient
java.net.SocketException: Broken pipe (Write failed)
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
	at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
	at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)
	at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295)
	at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141)
	at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229)
	at java.io.BufferedWriter.flush(BufferedWriter.java:254)
	at org.apache.commons.net.ftp.FTP.__send(FTP.java:545)
	at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:519)
	at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:648)
	at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:622)
	at org.apache.commons.net.ftp.FTP.quit(FTP.java:904)
	at org.apache.commons.net.ftp.FTPClient.logout(FTPClient.java:1148)
	at org.springframework.integration.ftp.session.FtpSession.close(FtpSession.java:159)
	at org.springframework.integration.file.remote.session.CachingSessionFactory$1.removedFromPool(CachingSessionFactory.java:93)
	at org.springframework.integration.file.remote.session.CachingSessionFactory$1.removedFromPool(CachingSessionFactory.java:80)
	at org.springframework.integration.util.SimplePool.doRemoveItem(SimplePool.java:256)
	at org.springframework.integration.util.SimplePool.doGetItem(SimplePool.java:202)
	at org.springframework.integration.util.SimplePool.getItem(SimplePool.java:173)
	at org.springframework.integration.file.remote.session.CachingSessionFactory.getSession(CachingSessionFactory.java:132)
	at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432)
	at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:298)
	at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:286)
	at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:278)
	at org.springframework.integration.file.remote.handler.FileTransferringMessageHandler.handleMessageInternal(FileTransferringMessageHandler.java:205)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177)
	at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:49)
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler$AdvisedRequestHandler.handleRequestMessage(AbstractReplyProducingMessageHandler.java:201)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$CallbackImpl.execute(AbstractRequestHandlerAdvice.java:151)
	at org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice.doInvoke(ExpressionEvaluatingRequestHandlerAdvice.java:237)
	at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:67)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
	at com.sun.proxy.$Proxy37.handleRequestMessage(Unknown Source)
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.doInvokeAdvisedRequestHandler(AbstractReplyProducingMessageHandler.java:146)
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:130)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177)
	at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:143)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:396)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:380)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:275)
	at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
	at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:272)
	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
	at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

@artembilan
Copy link
Member

So, I think we need to change that this.client.isConnected() before logout() to our isOpen(), which essentially calls the this.client.noop().
The CachingSessionFactory after noop() determines that a session is stale and tries to close it. When the isConnected() is true we try to logout(). Kinda inconsistency.
Looks like there is nothing sets a Socket.connected to false after an exception during the write to socket...

Does it sound reasonable?

@garyrussell
Copy link
Contributor

Sounds reasonable to me, but I still don't understand why this is a "new" problem on 5.2.x.

I suggest we also add a DEBUG or TRACE log when we get an exception is isOpen().

@artembilan
Copy link
Member

"new" problem on 5.2.x.

Yeah... That's really weird. Doesn't look like we have done some changes about caching in between.

@jonforums , any chances to to have some investigation on your side from the version 5.1.9?

Thanks

@jonforums
Copy link
Author

After spelunking SI's FTP, session, pool, and remote code more, I also don't see why this is a "new" 5.2.x problem...5.2.x vs. 5.1.9 now seems like a red herring. I'll spelunk a few log archives from 5.1.9 days to see if a similar exception was in the logs and I missed them back then.

@jonforums
Copy link
Author

Over a 5 week period (last 2 weeks of 2019, first 3 weeks of 2020) no FtpSession WARN msgs occur in my logs from the previous 5.1.9 prod app.

While I also don't think it's a 5.1.9 vs. 5.2.3 issue, it is odd that over 5 weeks of prod usage I didn't see the FtpSession warning.

I know you're trying to determine if/where to backport. If you think it will meaningfully help, let me know if you'd like me to try to repro on 5.1.9. I'd try to quickly configure a long-running MINA FtpServer v1.1.1 process on my windows dev laptop to see what happens. Not apples-to-apples with my Ubuntu Server/vsftpd prod setup, but another data point if you need it.

@artembilan
Copy link
Member

Thanks. That might be useful to observe similar logs during processing to determine a difference.

@jonforums
Copy link
Author

Repro'd on both 5.2.3 and 5.1.9 on my windows dev laptop using ftpserv v1.1.1. The test scenario for both was: FTP send (2) files, wait (5) minutes, SFTP send (1) file, wait (30) minutes, FTP send (1) file. In both 5.2.3 and 5.1.9, during the last FTP send SimplePool discovered it got a stale FtpSession and while trying to evict the stale session, the stale FtpSession instance threw a WARN - failed to disconnect FTPClient.

I can provide logs for both 5.2.3 and 5.1.9 if needed, but here's the relevant log snippet from the 5.1.9 run:

2020-02-07T11:35:08.642 [taskScheduler-12] DEBUG org.springframework.integration.ftp.outbound.FtpMessageHandler - ftpHandler received message: GenericMessage [...]
2020-02-07T11:35:08.642 [taskScheduler-12] DEBUG org.springframework.integration.util.SimplePool - Obtained org.springframework.integration.ftp.session.FtpSession@23943a98 from pool.
2020-02-07T11:35:08.642 [taskScheduler-12] DEBUG org.springframework.integration.util.SimplePool - Received a stale item org.springframework.integration.ftp.session.FtpSession@23943a98, will attempt to get a new one.
2020-02-07T11:35:08.642 [taskScheduler-12] DEBUG org.springframework.integration.util.SimplePool - Removing org.springframework.integration.ftp.session.FtpSession@23943a98 from the pool
2020-02-07T11:35:08.642 [taskScheduler-12] WARN  org.springframework.integration.ftp.session.FtpSession - failed to disconnect FTPClient
java.net.SocketException: Software caused connection abort: socket write error
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
	at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
	at sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)
	at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295)
	at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141)
	at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229)
	at java.io.BufferedWriter.flush(BufferedWriter.java:254)
	at org.apache.commons.net.ftp.FTP.__send(FTP.java:545)
	at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:519)
	at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:648)
	at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:622)
	at org.apache.commons.net.ftp.FTP.quit(FTP.java:904)
	at org.apache.commons.net.ftp.FTPClient.logout(FTPClient.java:1148)
	at org.springframework.integration.ftp.session.FtpSession.close(FtpSession.java:156)
	at org.springframework.integration.file.remote.session.CachingSessionFactory$1.removedFromPool(CachingSessionFactory.java:94)
	at org.springframework.integration.file.remote.session.CachingSessionFactory$1.removedFromPool(CachingSessionFactory.java:81)
	at org.springframework.integration.util.SimplePool.doRemoveItem(SimplePool.java:256)
	at org.springframework.integration.util.SimplePool.doGetItem(SimplePool.java:202)
	at org.springframework.integration.util.SimplePool.getItem(SimplePool.java:173)
	at org.springframework.integration.file.remote.session.CachingSessionFactory.getSession(CachingSessionFactory.java:135)
	at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432)
	at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:298)
	at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:286)
	at org.springframework.integration.file.remote.RemoteFileTemplate.send(RemoteFileTemplate.java:278)
	at org.springframework.integration.file.remote.handler.FileTransferringMessageHandler.handleMessageInternal(FileTransferringMessageHandler.java:205)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
	at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:49)
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler$AdvisedRequestHandler.handleRequestMessage(AbstractReplyProducingMessageHandler.java:193)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$1.execute(AbstractRequestHandlerAdvice.java:75)
	at org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice.doInvoke(ExpressionEvaluatingRequestHandlerAdvice.java:239)
	at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:70)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
	at com.sun.proxy.$Proxy29.handleRequestMessage(Unknown Source)
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.doInvokeAdvisedRequestHandler(AbstractReplyProducingMessageHandler.java:141)
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:126)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
	at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:143)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
	at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
	at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
	at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

@artembilan
Copy link
Member

So, therefore there is no difference between those versions and we are good to go with noop() around logout() instead of isConnected()?

Or should we treat this as "Works as Designed" and close respectively?

What I see that you confirm that there is no regression and behavior is the same.

@jonforums
Copy link
Author

Whatever you decide, I'd like to see this specific class of FtpSession exception that does not indicate a critical error changed from WARN to DEBUG (or TRACE). By default, I run most of my prod loggers at INFO level. I don't see why this session timout type of exception should be at WARN level filling up the logs.

@artembilan
Copy link
Member

Got it!

I'll ping you for PR review.

Or let me know if you are good with contributing the fix: https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.adoc

@jonforums
Copy link
Author

I'm not going to go through the contribution ritual. It's enough to see that you two are comfortable with a reliable fix, and updates are on the way to my favorite Spring family project 😃

My only remaining concern is with integration tests. I don't believe the intermediate SFTP send in my test scenario had anything to do with the problem. I think it was caused by the ~35 minute (??) period between the first FTP sends and the second FTP send. It would be nice to have longer running integration tests that don't run too long but still catch these types of scenarios. Maybe the MINA-based ftpserver and sshserver libraries have an easy-to-configure option to set client disconnects to ~2-3 minutes.

@artembilan artembilan self-assigned this Feb 10, 2020
artembilan added a commit to artembilan/spring-integration that referenced this issue Feb 10, 2020
Fixes spring-projects#3168

* Call `this.client.noop()` instead of `this.client.isConnected()` to really
check that client has a live connection with the server before calling a `this.client.logout()`
* Wrap `this.client.logout()` into a `try..catch` to be sure that we wil call a
`this.client.disconnect()` even if `logout()` fails for some reason.
* Change `WARN` logs about `Session.close()` into a `DEBUG` level -
when we have a problem with closing session because of server disconnect reason
we have no any control to do with a situation

**Cherry-pick to 5.2.x**
garyrussell pushed a commit that referenced this issue Feb 11, 2020
Fixes #3168

* Call `this.client.noop()` instead of `this.client.isConnected()` to really
check that client has a live connection with the server before calling a `this.client.logout()`
* Wrap `this.client.logout()` into a `try..catch` to be sure that we wil call a
`this.client.disconnect()` even if `logout()` fails for some reason.
* Change `WARN` logs about `Session.close()` into a `DEBUG` level -
when we have a problem with closing session because of server disconnect reason
we have no any control to do with a situation

**Cherry-pick to 5.2.x**
garyrussell pushed a commit that referenced this issue Feb 11, 2020
Fixes #3168

* Call `this.client.noop()` instead of `this.client.isConnected()` to really
check that client has a live connection with the server before calling a `this.client.logout()`
* Wrap `this.client.logout()` into a `try..catch` to be sure that we wil call a
`this.client.disconnect()` even if `logout()` fails for some reason.
* Change `WARN` logs about `Session.close()` into a `DEBUG` level -
when we have a problem with closing session because of server disconnect reason
we have no any control to do with a situation

**Cherry-pick to 5.2.x**
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants