Skip to content

Commit 03c357f

Browse files
committed
Support custom SASL mechanisms
There is some interest in supporting various SASL mechanisms not currently included in the library: * dpkp#2110 (DMS) * dpkp#2204 (SSPI) * dpkp#2232 (AWS_MSK_IAM) Adding these mechanisms in the core library may be undesirable due to: * Increased maintenance burden. * Unavailable testing environments. * Vendor specificity. This commit provides a quick prototype for a pluggable SASL system. --- **Example** To define a custom SASL mechanism a module must implement two methods: ```py def validate_config(conn): # Check configuration values, available libraries, etc. assert conn.config['vendor_specific_setting'] is not None, ( 'vendor_specific_setting required when sasl_mechanism=MY_SASL' ) def try_authenticate(conn, future): # Do authentication routine and return resolved Future with failed # or succeeded state. ``` And then the custom mechanism should be registered before initializing a KafkaAdminClient, KafkaConsumer, or KafkaProducer: ```py import kafka.sasl from kafka import KafkaProducer import my_sasl kafka.sasl.register_mechanism('MY_SASL', my_sasl) producer = KafkaProducer(sasl_mechanism='MY_SASL') ``` --- **Notes** **ABCs** This prototype does not implement an ABC for custom SASL mechanisms. Using an ABC would reduce a few of the explicit assertions involved with registering a mechanism and is a viable option. Due to differing feature sets between py2/py3 this option was not explored, but shouldn't be difficult. **Private Methods** This prototype relies on some methods that are currently marked as **private** in `BrokerConnection`. * `._can_send_recv` * `._lock` * `._recv_bytes_blocking` * `._send_bytes_blocking` A pluggable system would require stable interfaces for these actions. **Alternative Approach** If the module-scoped dict modification in `register_mechanism` feels too clunky maybe the addtional mechanisms can be specified via an argument when initializing one of the `Kafka*` classes?
1 parent f0a57a6 commit 03c357f

File tree

6 files changed

+378
-255
lines changed

6 files changed

+378
-255
lines changed

kafka/conn.py

Lines changed: 19 additions & 255 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import copy
44
import errno
5-
import io
65
import logging
76
from random import shuffle, uniform
87

@@ -14,25 +13,26 @@
1413
from kafka.vendor import selectors34 as selectors
1514

1615
import socket
17-
import struct
1816
import threading
1917
import time
2018

2119
from kafka.vendor import six
2220

21+
from kafka import sasl
2322
import kafka.errors as Errors
2423
from kafka.future import Future
2524
from kafka.metrics.stats import Avg, Count, Max, Rate
26-
from kafka.oauth.abstract import AbstractTokenProvider
27-
from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest_v2, DescribeClientQuotasRequest
25+
from kafka.protocol.admin import (
26+
DescribeAclsRequest_v2,
27+
DescribeClientQuotasRequest,
28+
SaslHandShakeRequest,
29+
)
2830
from kafka.protocol.commit import OffsetFetchRequest
2931
from kafka.protocol.offset import OffsetRequest
3032
from kafka.protocol.produce import ProduceRequest
3133
from kafka.protocol.metadata import MetadataRequest
3234
from kafka.protocol.fetch import FetchRequest
3335
from kafka.protocol.parser import KafkaProtocol
34-
from kafka.protocol.types import Int32, Int8
35-
from kafka.scram import ScramClient
3636
from kafka.version import __version__
3737

3838

@@ -83,6 +83,12 @@ class SSLWantWriteError(Exception):
8383
gssapi = None
8484
GSSError = None
8585

86+
# needed for AWS_MSK_IAM authentication:
87+
try:
88+
from botocore.session import Session as BotoSession
89+
except ImportError:
90+
# no botocore available, will disable AWS_MSK_IAM mechanism
91+
BotoSession = None
8692

8793
AFI_NAMES = {
8894
socket.AF_UNSPEC: "unspecified",
@@ -227,7 +233,6 @@ class BrokerConnection(object):
227233
'sasl_oauth_token_provider': None
228234
}
229235
SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
230-
SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512")
231236

232237
def __init__(self, host, port, afi, **configs):
233238
self.host = host
@@ -260,22 +265,10 @@ def __init__(self, host, port, afi, **configs):
260265
assert ssl_available, "Python wasn't built with SSL support"
261266

262267
if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
263-
assert self.config['sasl_mechanism'] in self.SASL_MECHANISMS, (
264-
'sasl_mechanism must be in ' + ', '.join(self.SASL_MECHANISMS))
265-
if self.config['sasl_mechanism'] in ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512'):
266-
assert self.config['sasl_plain_username'] is not None, (
267-
'sasl_plain_username required for PLAIN or SCRAM sasl'
268-
)
269-
assert self.config['sasl_plain_password'] is not None, (
270-
'sasl_plain_password required for PLAIN or SCRAM sasl'
271-
)
272-
if self.config['sasl_mechanism'] == 'GSSAPI':
273-
assert gssapi is not None, 'GSSAPI lib not available'
274-
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
275-
if self.config['sasl_mechanism'] == 'OAUTHBEARER':
276-
token_provider = self.config['sasl_oauth_token_provider']
277-
assert token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl'
278-
assert callable(getattr(token_provider, "token", None)), 'sasl_oauth_token_provider must implement method #token()'
268+
assert self.config['sasl_mechanism'] in sasl.MECHANISMS, (
269+
'sasl_mechanism must be one of {}'.format(', '.join(sasl.MECHANISMS.keys()))
270+
)
271+
sasl.MECHANISMS[self.config['sasl_mechanism']].validate_config(self)
279272
# This is not a general lock / this class is not generally thread-safe yet
280273
# However, to avoid pushing responsibility for maintaining
281274
# per-connection locks to the upstream client, we will use this lock to
@@ -553,19 +546,9 @@ def _handle_sasl_handshake_response(self, future, response):
553546
Errors.UnsupportedSaslMechanismError(
554547
'Kafka broker does not support %s sasl mechanism. Enabled mechanisms are: %s'
555548
% (self.config['sasl_mechanism'], response.enabled_mechanisms)))
556-
elif self.config['sasl_mechanism'] == 'PLAIN':
557-
return self._try_authenticate_plain(future)
558-
elif self.config['sasl_mechanism'] == 'GSSAPI':
559-
return self._try_authenticate_gssapi(future)
560-
elif self.config['sasl_mechanism'] == 'OAUTHBEARER':
561-
return self._try_authenticate_oauth(future)
562-
elif self.config['sasl_mechanism'].startswith("SCRAM-SHA-"):
563-
return self._try_authenticate_scram(future)
564-
else:
565-
return future.failure(
566-
Errors.UnsupportedSaslMechanismError(
567-
'kafka-python does not support SASL mechanism %s' %
568-
self.config['sasl_mechanism']))
549+
550+
try_authenticate = sasl.MECHANISMS[self.config['sasl_mechanism']].try_authenticate
551+
return try_authenticate(self, future)
569552

570553
def _send_bytes(self, data):
571554
"""Send some data via non-blocking IO
@@ -619,225 +602,6 @@ def _recv_bytes_blocking(self, n):
619602
finally:
620603
self._sock.settimeout(0.0)
621604

622-
def _try_authenticate_plain(self, future):
623-
if self.config['security_protocol'] == 'SASL_PLAINTEXT':
624-
log.warning('%s: Sending username and password in the clear', self)
625-
626-
data = b''
627-
# Send PLAIN credentials per RFC-4616
628-
msg = bytes('\0'.join([self.config['sasl_plain_username'],
629-
self.config['sasl_plain_username'],
630-
self.config['sasl_plain_password']]).encode('utf-8'))
631-
size = Int32.encode(len(msg))
632-
633-
err = None
634-
close = False
635-
with self._lock:
636-
if not self._can_send_recv():
637-
err = Errors.NodeNotReadyError(str(self))
638-
close = False
639-
else:
640-
try:
641-
self._send_bytes_blocking(size + msg)
642-
643-
# The server will send a zero sized message (that is Int32(0)) on success.
644-
# The connection is closed on failure
645-
data = self._recv_bytes_blocking(4)
646-
647-
except (ConnectionError, TimeoutError) as e:
648-
log.exception("%s: Error receiving reply from server", self)
649-
err = Errors.KafkaConnectionError("%s: %s" % (self, e))
650-
close = True
651-
652-
if err is not None:
653-
if close:
654-
self.close(error=err)
655-
return future.failure(err)
656-
657-
if data != b'\x00\x00\x00\x00':
658-
error = Errors.AuthenticationFailedError('Unrecognized response during authentication')
659-
return future.failure(error)
660-
661-
log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username'])
662-
return future.success(True)
663-
664-
def _try_authenticate_scram(self, future):
665-
if self.config['security_protocol'] == 'SASL_PLAINTEXT':
666-
log.warning('%s: Exchanging credentials in the clear', self)
667-
668-
scram_client = ScramClient(
669-
self.config['sasl_plain_username'], self.config['sasl_plain_password'], self.config['sasl_mechanism']
670-
)
671-
672-
err = None
673-
close = False
674-
with self._lock:
675-
if not self._can_send_recv():
676-
err = Errors.NodeNotReadyError(str(self))
677-
close = False
678-
else:
679-
try:
680-
client_first = scram_client.first_message().encode('utf-8')
681-
size = Int32.encode(len(client_first))
682-
self._send_bytes_blocking(size + client_first)
683-
684-
(data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4))
685-
server_first = self._recv_bytes_blocking(data_len).decode('utf-8')
686-
scram_client.process_server_first_message(server_first)
687-
688-
client_final = scram_client.final_message().encode('utf-8')
689-
size = Int32.encode(len(client_final))
690-
self._send_bytes_blocking(size + client_final)
691-
692-
(data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4))
693-
server_final = self._recv_bytes_blocking(data_len).decode('utf-8')
694-
scram_client.process_server_final_message(server_final)
695-
696-
except (ConnectionError, TimeoutError) as e:
697-
log.exception("%s: Error receiving reply from server", self)
698-
err = Errors.KafkaConnectionError("%s: %s" % (self, e))
699-
close = True
700-
701-
if err is not None:
702-
if close:
703-
self.close(error=err)
704-
return future.failure(err)
705-
706-
log.info(
707-
'%s: Authenticated as %s via %s', self, self.config['sasl_plain_username'], self.config['sasl_mechanism']
708-
)
709-
return future.success(True)
710-
711-
def _try_authenticate_gssapi(self, future):
712-
kerberos_damin_name = self.config['sasl_kerberos_domain_name'] or self.host
713-
auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name
714-
gssapi_name = gssapi.Name(
715-
auth_id,
716-
name_type=gssapi.NameType.hostbased_service
717-
).canonicalize(gssapi.MechType.kerberos)
718-
log.debug('%s: GSSAPI name: %s', self, gssapi_name)
719-
720-
err = None
721-
close = False
722-
with self._lock:
723-
if not self._can_send_recv():
724-
err = Errors.NodeNotReadyError(str(self))
725-
close = False
726-
else:
727-
# Establish security context and negotiate protection level
728-
# For reference RFC 2222, section 7.2.1
729-
try:
730-
# Exchange tokens until authentication either succeeds or fails
731-
client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate')
732-
received_token = None
733-
while not client_ctx.complete:
734-
# calculate an output token from kafka token (or None if first iteration)
735-
output_token = client_ctx.step(received_token)
736-
737-
# pass output token to kafka, or send empty response if the security
738-
# context is complete (output token is None in that case)
739-
if output_token is None:
740-
self._send_bytes_blocking(Int32.encode(0))
741-
else:
742-
msg = output_token
743-
size = Int32.encode(len(msg))
744-
self._send_bytes_blocking(size + msg)
745-
746-
# The server will send a token back. Processing of this token either
747-
# establishes a security context, or it needs further token exchange.
748-
# The gssapi will be able to identify the needed next step.
749-
# The connection is closed on failure.
750-
header = self._recv_bytes_blocking(4)
751-
(token_size,) = struct.unpack('>i', header)
752-
received_token = self._recv_bytes_blocking(token_size)
753-
754-
# Process the security layer negotiation token, sent by the server
755-
# once the security context is established.
756-
757-
# unwraps message containing supported protection levels and msg size
758-
msg = client_ctx.unwrap(received_token).message
759-
# Kafka currently doesn't support integrity or confidentiality security layers, so we
760-
# simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
761-
# by the server
762-
msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))) + msg[1:]
763-
# add authorization identity to the response, GSS-wrap and send it
764-
msg = client_ctx.wrap(msg + auth_id.encode(), False).message
765-
size = Int32.encode(len(msg))
766-
self._send_bytes_blocking(size + msg)
767-
768-
except (ConnectionError, TimeoutError) as e:
769-
log.exception("%s: Error receiving reply from server", self)
770-
err = Errors.KafkaConnectionError("%s: %s" % (self, e))
771-
close = True
772-
except Exception as e:
773-
err = e
774-
close = True
775-
776-
if err is not None:
777-
if close:
778-
self.close(error=err)
779-
return future.failure(err)
780-
781-
log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name)
782-
return future.success(True)
783-
784-
def _try_authenticate_oauth(self, future):
785-
data = b''
786-
787-
msg = bytes(self._build_oauth_client_request().encode("utf-8"))
788-
size = Int32.encode(len(msg))
789-
790-
err = None
791-
close = False
792-
with self._lock:
793-
if not self._can_send_recv():
794-
err = Errors.NodeNotReadyError(str(self))
795-
close = False
796-
else:
797-
try:
798-
# Send SASL OAuthBearer request with OAuth token
799-
self._send_bytes_blocking(size + msg)
800-
801-
# The server will send a zero sized message (that is Int32(0)) on success.
802-
# The connection is closed on failure
803-
data = self._recv_bytes_blocking(4)
804-
805-
except (ConnectionError, TimeoutError) as e:
806-
log.exception("%s: Error receiving reply from server", self)
807-
err = Errors.KafkaConnectionError("%s: %s" % (self, e))
808-
close = True
809-
810-
if err is not None:
811-
if close:
812-
self.close(error=err)
813-
return future.failure(err)
814-
815-
if data != b'\x00\x00\x00\x00':
816-
error = Errors.AuthenticationFailedError('Unrecognized response during authentication')
817-
return future.failure(error)
818-
819-
log.info('%s: Authenticated via OAuth', self)
820-
return future.success(True)
821-
822-
def _build_oauth_client_request(self):
823-
token_provider = self.config['sasl_oauth_token_provider']
824-
return "n,,\x01auth=Bearer {}{}\x01\x01".format(token_provider.token(), self._token_extensions())
825-
826-
def _token_extensions(self):
827-
"""
828-
Return a string representation of the OPTIONAL key-value pairs that can be sent with an OAUTHBEARER
829-
initial request.
830-
"""
831-
token_provider = self.config['sasl_oauth_token_provider']
832-
833-
# Only run if the #extensions() method is implemented by the clients Token Provider class
834-
# Builds up a string separated by \x01 via a dict of key value pairs
835-
if callable(getattr(token_provider, "extensions", None)) and len(token_provider.extensions()) > 0:
836-
msg = "\x01".join(["{}={}".format(k, v) for k, v in token_provider.extensions().items()])
837-
return "\x01" + msg
838-
else:
839-
return ""
840-
841605
def blacked_out(self):
842606
"""
843607
Return true if we are disconnected from the given node and can't

kafka/sasl/__init__.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import logging
2+
3+
from kafka.sasl import gssapi, oauthbearer, plain, scram
4+
5+
log = logging.getLogger(__name__)
6+
7+
MECHANISMS = {
8+
'GSSAPI': gssapi,
9+
'OAUTHBEARER': oauthbearer,
10+
'PLAIN': plain,
11+
'SCRAM-SHA-256': scram,
12+
'SCRAM-SHA-512': scram,
13+
}
14+
15+
16+
def register_mechanism(key, module):
17+
"""
18+
Registers a custom SASL mechanism that can be used via sasl_mechanism={key}.
19+
20+
Example:
21+
import kakfa.sasl
22+
from kafka import KafkaProducer
23+
from mymodule import custom_sasl
24+
kafka.sasl.register_mechanism('CUSTOM_SASL', custom_sasl)
25+
26+
producer = KafkaProducer(sasl_mechanism='CUSTOM_SASL')
27+
28+
Arguments:
29+
key (str): The name of the mechanism returned by the broker and used
30+
in the sasl_mechanism config value.
31+
module (module): A module that implements the following methods...
32+
33+
def validate_config(conn: BrokerConnection): -> None:
34+
# Raises an AssertionError for missing or invalid conifg values.
35+
36+
def try_authenticate(conn: BrokerConncetion, future: -> Future):
37+
# Executes authentication routine and returns a resolved Future.
38+
39+
Raises:
40+
AssertionError: The registered module does not define a required method.
41+
"""
42+
assert callable(getattr(module, 'validate_config', None)), (
43+
'Custom SASL mechanism {} must implement method #validate_config()'
44+
.format(key)
45+
)
46+
assert callable(getattr(module, 'try_authenticate', None)), (
47+
'Custom SASL mechanism {} must implement method #try_authenticate()'
48+
.format(key)
49+
)
50+
if key in MECHANISMS:
51+
log.warning('Overriding existing SASL mechanism {}'.format(key))
52+
53+
MECHANISMS[key] = module

0 commit comments

Comments
 (0)