@@ -274,9 +274,7 @@ def __init__(self, host, port, afi, **configs):
274274 # can use a simple dictionary of correlation_id => request data
275275 self .in_flight_requests = dict ()
276276
277- self ._protocol = KafkaProtocol (
278- client_id = self .config ['client_id' ],
279- api_version = self .config ['api_version' ])
277+ self ._protocol = self ._new_protocol_parser ()
280278 self .state = ConnectionStates .DISCONNECTED
281279 self ._reset_reconnect_backoff ()
282280 self ._sock = None
@@ -295,6 +293,12 @@ def __init__(self, host, port, afi, **configs):
295293 self .config ['metric_group_prefix' ],
296294 self .node_id )
297295
296+ def _new_protocol_parser (self ):
297+ return KafkaProtocol (
298+ ident = '%s:%d' % (self .host , self .port ),
299+ client_id = self .config ['client_id' ],
300+ api_version = self .config ['api_version' ])
301+
298302 def _init_sasl_mechanism (self ):
299303 if self .config ['security_protocol' ] in ('SASL_PLAINTEXT' , 'SASL_SSL' ):
300304 self ._sasl_mechanism = get_sasl_mechanism (self .config ['sasl_mechanism' ])(host = self .host , ** self .config )
@@ -934,9 +938,7 @@ def close(self, error=None):
934938 self ._api_versions_future = None
935939 self ._sasl_auth_future = None
936940 self ._init_sasl_mechanism ()
937- self ._protocol = KafkaProtocol (
938- client_id = self .config ['client_id' ],
939- api_version = self .config ['api_version' ])
941+ self ._protocol = self ._new_protocol_parser ()
940942 self ._send_buffer = b''
941943 if error is None :
942944 error = Errors .Cancelled (str (self ))
0 commit comments