Skip to content
This repository was archived by the owner on Jul 15, 2023. It is now read-only.

Commit d4c2dda

Browse files
Merge pull request #49 from danielli90/connect
change Connect() to use async connect call and honor receiveTimeoutMs…
2 parents bdb8515 + 4b743bc commit d4c2dda

File tree

1 file changed

+46
-18
lines changed

1 file changed

+46
-18
lines changed

src/KafkaNET.Library/KafkaConnection.cs

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -191,21 +191,21 @@ public override string ToString()
191191

192192
private void Connect()
193193
{
194-
if (socket != null)
194+
var watch = Stopwatch.StartNew();
195+
if (this.socket != null)
195196
{
196197
try
197198
{
198199
CloseConnection();
199200
}
200201
catch (Exception e)
201202
{
202-
Logger.Error(this.ToString() + ExceptionUtil.GetExceptionDetailInfo(e));
203+
Logger.Error(string.Format("KafkaConnectio.Connect() exception in CloseConnection, duration={0}ms", watch.ElapsedMilliseconds), e);
203204
}
204205
}
205206

206207
this.socket = null;
207208

208-
209209
IPAddress targetAddress;
210210
if (IPAddress.TryParse(server, out targetAddress))
211211
{
@@ -220,12 +220,23 @@ private void Connect()
220220
ReceiveBufferSize = bufferSize
221221
};
222222

223-
newSocket.Connect(targetAddress, port);
224-
socket = newSocket;
223+
var result = newSocket.BeginConnect(targetAddress, port, null, null);
224+
// use receiveTimeoutMs as connectionTimeoutMs
225+
result.AsyncWaitHandle.WaitOne(this.receiveTimeoutMs, true);
226+
result.AsyncWaitHandle.Close();
227+
228+
if (newSocket.Connected)
229+
{
230+
this.socket = newSocket;
231+
}
232+
else
233+
{
234+
newSocket.Close();
235+
}
225236
}
226237
catch (Exception ex)
227238
{
228-
Logger.Error(this.ToString() + ExceptionUtil.GetExceptionDetailInfo(ex));
239+
Logger.Error(string.Format("KafkaConnectio.Connect() failed, duration={0}ms,this={1},targetAddress={2}", watch.ElapsedMilliseconds, this, targetAddress), ex);
229240
throw new UnableToConnectToHostException(targetAddress.ToString(), port, ex);
230241
}
231242
}
@@ -243,33 +254,50 @@ private void Connect()
243254
try
244255
{
245256
var newSocket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp)
257+
{
258+
NoDelay = true,
259+
ReceiveTimeout = this.receiveTimeoutMs,
260+
SendTimeout = this.sendTimeoutMs,
261+
SendBufferSize = bufferSize,
262+
ReceiveBufferSize = bufferSize
263+
};
264+
265+
var result = newSocket.BeginConnect(address, port, null, null);
266+
// use receiveTimeoutMs as connectionTimeoutMs
267+
result.AsyncWaitHandle.WaitOne(this.receiveTimeoutMs, true);
268+
result.AsyncWaitHandle.Close();
269+
270+
if (!newSocket.Connected)
246271
{
247-
NoDelay = true,
248-
ReceiveTimeout = this.receiveTimeoutMs,
249-
SendTimeout = this.sendTimeoutMs,
250-
SendBufferSize = bufferSize,
251-
ReceiveBufferSize = bufferSize
252-
};
253-
254-
newSocket.Connect(address, port);
255-
socket = newSocket;
272+
newSocket.Close();
273+
continue;
274+
}
275+
276+
this.socket = newSocket;
256277
break;
257278
}
258279
catch (Exception e)
259280
{
281+
Logger.Error(string.Format("ErrorConnectingToAddress, duration={0}ms,address={1},server={2},port={3}", watch.ElapsedMilliseconds, address, server, port), e);
260282
throw new UnableToConnectToHostException(server, port, e);
261283
}
262284
}
263285
}
264286

265287
if (socket == null)
266288
{
289+
Logger.ErrorFormat("UnableToConnectToHostException, duration={0}ms,server={1},port={2}", watch.ElapsedMilliseconds, server, port);
267290
throw new UnableToConnectToHostException(server, port);
268291
}
269292

270-
this.stream = new NetworkStream(socket, true);
271-
this.stream.ReadTimeout = networkStreamReadTimeoutMs;
272-
this.stream.WriteTimeout = networkStreamWriteTimeoutMs;
293+
Logger.DebugFormat("KafkaConnection.Connect() succeeded, duration={0}ms,server={1},port={2}",
294+
watch.ElapsedMilliseconds, server, port);
295+
296+
this.stream = new NetworkStream(socket, true)
297+
{
298+
ReadTimeout = this.networkStreamReadTimeoutMs,
299+
WriteTimeout = this.networkStreamWriteTimeoutMs
300+
};
273301
this.reader = new KafkaBinaryReader(stream);
274302
Connected = true;
275303
this.lastActiveTimeMs = Environment.TickCount;

0 commit comments

Comments
 (0)