diff --git a/src/KafkaNET.Library/KafkaConnection.cs b/src/KafkaNET.Library/KafkaConnection.cs index 475ba7d..414e197 100644 --- a/src/KafkaNET.Library/KafkaConnection.cs +++ b/src/KafkaNET.Library/KafkaConnection.cs @@ -191,7 +191,8 @@ public override string ToString() private void Connect() { - if (socket != null) + var watch = Stopwatch.StartNew(); + if (this.socket != null) { try { @@ -199,13 +200,12 @@ private void Connect() } catch (Exception e) { - Logger.Error(this.ToString() + ExceptionUtil.GetExceptionDetailInfo(e)); + Logger.Error(string.Format("KafkaConnectio.Connect() exception in CloseConnection, duration={0}ms", watch.ElapsedMilliseconds), e); } } this.socket = null; - IPAddress targetAddress; if (IPAddress.TryParse(server, out targetAddress)) { @@ -220,12 +220,23 @@ private void Connect() ReceiveBufferSize = bufferSize }; - newSocket.Connect(targetAddress, port); - socket = newSocket; + var result = newSocket.BeginConnect(targetAddress, port, null, null); + // use receiveTimeoutMs as connectionTimeoutMs + result.AsyncWaitHandle.WaitOne(this.receiveTimeoutMs, true); + result.AsyncWaitHandle.Close(); + + if (newSocket.Connected) + { + this.socket = newSocket; + } + else + { + newSocket.Close(); + } } catch (Exception ex) { - Logger.Error(this.ToString() + ExceptionUtil.GetExceptionDetailInfo(ex)); + Logger.Error(string.Format("KafkaConnectio.Connect() failed, duration={0}ms,this={1},targetAddress={2}", watch.ElapsedMilliseconds, this, targetAddress), ex); throw new UnableToConnectToHostException(targetAddress.ToString(), port, ex); } } @@ -243,20 +254,31 @@ private void Connect() try { var newSocket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp) + { + NoDelay = true, + ReceiveTimeout = this.receiveTimeoutMs, + SendTimeout = this.sendTimeoutMs, + SendBufferSize = bufferSize, + ReceiveBufferSize = bufferSize + }; + + var result = newSocket.BeginConnect(address, port, null, null); + // use receiveTimeoutMs as connectionTimeoutMs + result.AsyncWaitHandle.WaitOne(this.receiveTimeoutMs, true); + result.AsyncWaitHandle.Close(); + + if (!newSocket.Connected) { - NoDelay = true, - ReceiveTimeout = this.receiveTimeoutMs, - SendTimeout = this.sendTimeoutMs, - SendBufferSize = bufferSize, - ReceiveBufferSize = bufferSize - }; - - newSocket.Connect(address, port); - socket = newSocket; + newSocket.Close(); + continue; + } + + this.socket = newSocket; break; } catch (Exception e) { + Logger.Error(string.Format("ErrorConnectingToAddress, duration={0}ms,address={1},server={2},port={3}", watch.ElapsedMilliseconds, address, server, port), e); throw new UnableToConnectToHostException(server, port, e); } } @@ -264,12 +286,18 @@ private void Connect() if (socket == null) { + Logger.ErrorFormat("UnableToConnectToHostException, duration={0}ms,server={1},port={2}", watch.ElapsedMilliseconds, server, port); throw new UnableToConnectToHostException(server, port); } - this.stream = new NetworkStream(socket, true); - this.stream.ReadTimeout = networkStreamReadTimeoutMs; - this.stream.WriteTimeout = networkStreamWriteTimeoutMs; + Logger.DebugFormat("KafkaConnection.Connect() succeeded, duration={0}ms,server={1},port={2}", + watch.ElapsedMilliseconds, server, port); + + this.stream = new NetworkStream(socket, true) + { + ReadTimeout = this.networkStreamReadTimeoutMs, + WriteTimeout = this.networkStreamWriteTimeoutMs + }; this.reader = new KafkaBinaryReader(stream); Connected = true; this.lastActiveTimeMs = Environment.TickCount;