Skip to content

Commit 63bf39e

Browse files
authored
Implement AWS IAM MSK Authent on top of version 1.9.2 (confluentinc#1)
Handle AWS IAM MSK Authent * Static creds (with and without security token) * EC2 Metadata (with IAM profile) * Web Authentication File (Iam2pod in kubernetes) This work was inspired by c339dc0 and 48e8639
1 parent 9b72ca3 commit 63bf39e

21 files changed

Lines changed: 2566 additions & 11 deletions

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ librdkafka v1.9.0 is a feature release:
8282
* Added `test.mock.broker.rtt` to simulate RTT/latency for mock brokers.
8383

8484

85+
86+
## Enhancements
87+
88+
* Added `AWS_MSK_IAM` to supported `sasl.mechanisms`. This feature
89+
provides support for using IAM authentication on AWS MSK clusters. (@garrett528, #3402)
90+
8591
## Fixes
8692

8793
### General fixes

CONFIGURATION.md

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
Property | C/P | Range | Default | Importance | Description
55
-----------------------------------------|-----|-----------------|--------------:|------------| --------------------------
6-
builtin.features | * | | gzip, snappy, ssl, sasl, regex, lz4, sasl_gssapi, sasl_plain, sasl_scram, plugins, zstd, sasl_oauthbearer, http, oidc | low | Indicates the builtin features for this build of librdkafka. An application can either query this value or attempt to set it with its list of required features to check for library support. <br>*Type: CSV flags*
6+
builtin.features | * | | gzip, snappy, ssl, sasl, regex, lz4, sasl_gssapi, sasl_plain, sasl_scram, plugins, zstd, sasl_oauthbearer, http, oidc, sasl_aws_msk_iam | low | Indicates the builtin features for this build of librdkafka. An application can either query this value or attempt to set it with its list of required features to check for library support. <br>*Type: CSV flags*
77
client.id | * | | rdkafka | low | Client identifier. <br>*Type: string*
88
metadata.broker.list | * | | | high | Initial list of brokers as a CSV list of broker host or host:port. The application may also use `rd_kafka_brokers_add()` to add brokers during runtime. <br>*Type: string*
99
bootstrap.servers | * | | | high | Alias for `metadata.broker.list`: Initial list of brokers as a CSV list of broker host or host:port. The application may also use `rd_kafka_brokers_add()` to add brokers during runtime. <br>*Type: string*
@@ -81,15 +81,23 @@ ssl_engine_callback_data | * | |
8181
enable.ssl.certificate.verification | * | true, false | true | low | Enable OpenSSL's builtin broker (server) certificate verification. This verification can be extended by the application by implementing a certificate_verify_cb. <br>*Type: boolean*
8282
ssl.endpoint.identification.algorithm | * | none, https | none | low | Endpoint identification algorithm to validate broker hostname using broker certificate. https - Server (broker) hostname verification as specified in RFC2818. none - No endpoint verification. OpenSSL >= 1.0.2 required. <br>*Type: enum value*
8383
ssl.certificate.verify_cb | * | | | low | Callback to verify the broker certificate chain. <br>*Type: see dedicated API*
84-
sasl.mechanisms | * | | GSSAPI | high | SASL mechanism to use for authentication. Supported: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER. **NOTE**: Despite the name only one mechanism must be configured. <br>*Type: string*
85-
sasl.mechanism | * | | GSSAPI | high | Alias for `sasl.mechanisms`: SASL mechanism to use for authentication. Supported: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER. **NOTE**: Despite the name only one mechanism must be configured. <br>*Type: string*
84+
sasl.mechanisms | * | | GSSAPI | high | SASL mechanism to use for authentication. Supported: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER, AWS_MSK_IAM. **NOTE**: Despite the name only one mechanism must be configured. <br>*Type: string*
85+
sasl.mechanism | * | | GSSAPI | high | Alias for `sasl.mechanisms`: SASL mechanism to use for authentication. Supported: GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER, AWS_MSK_IAM. **NOTE**: Despite the name only one mechanism must be configured. <br>*Type: string*
8686
sasl.kerberos.service.name | * | | kafka | low | Kerberos principal name that Kafka runs as, not including /hostname@REALM <br>*Type: string*
8787
sasl.kerberos.principal | * | | kafkaclient | low | This client's Kerberos principal name. (Not supported on Windows, will use the logon user's principal). <br>*Type: string*
8888
sasl.kerberos.kinit.cmd | * | | kinit -R -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal} \|\| kinit -t "%{sasl.kerberos.keytab}" -k %{sasl.kerberos.principal} | low | Shell command to refresh or acquire the client's Kerberos ticket. This command is executed on client creation and every sasl.kerberos.min.time.before.relogin (0=disable). %{config.prop.name} is replaced by corresponding config object value. <br>*Type: string*
8989
sasl.kerberos.keytab | * | | | low | Path to Kerberos keytab file. This configuration property is only used as a variable in `sasl.kerberos.kinit.cmd` as ` ... -t "%{sasl.kerberos.keytab}"`. <br>*Type: string*
9090
sasl.kerberos.min.time.before.relogin | * | 0 .. 86400000 | 60000 | low | Minimum time in milliseconds between key refresh attempts. Disable automatic key refresh by setting this property to 0. <br>*Type: integer*
9191
sasl.username | * | | | high | SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms <br>*Type: string*
9292
sasl.password | * | | | high | SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism <br>*Type: string*
93+
sasl.aws_access_key_id | * | | | high | SASL AWS access key id for use with the AWS_MSK_IAM mechanism. Default to $AWS_ACCESS_KEY_ID. <br>*Type: string*
94+
sasl.aws_secret_access_key | * | | | high | SASL AWS secret access key for use with the AWS_MSK_IAM mechanism. . Default to $AWS_SECRET_ACCESS_KEY. <br>*Type: string*
95+
sasl.aws_region | * | | | high | SASL AWS region for use with the AWS_MSK_IAM mechanism. Default to $AWS_DEFAULT_REGION. <br>*Type: string*
96+
sasl.aws_security_token | * | | | high | SASL AWS security for use with the AWS_MSK_IAM mechanism. Default to $AWS_SECURITY_TOKEN. <br>*Type: string*
97+
sasl.aws.role_arn | * | | | high | AWS RoleARN to use for calling STS. Default to $AWS_ROLE_ARN. <br>*Type: string*
98+
sasl.aws.web_identity_token_file. | * | | | high | AWS Web Identity token file to use for calling STS. Default to $AWS_WEB_IDENTITY_TOKEN_FILE. <br>*Type: string*
99+
sasl.aws.role.session.name | * | | | high | Session name to use for STS AssumeRole. Default to librdkafka. <br>*Type: string*
100+
sasl.aws.duration.sec | * | 900 .. 43200 | 900 | low | The duration, in seconds, of the role session. Minimum is 900 seconds (15 minutes) and max is 12 hours. This will default to 900 seconds if not set. <br>*Type: integer*
93101
sasl.oauthbearer.config | * | | | low | SASL/OAUTHBEARER configuration. The format is implementation-dependent and must be parsed accordingly. The default unsecured token implementation (see https://tools.ietf.org/html/rfc7515#appendix-A.5) recognizes space-separated name=value pairs with valid names including principalClaimName, principal, scopeClaimName, scope, and lifeSeconds. The default value for principalClaimName is "sub", the default value for scopeClaimName is "scope", and the default value for lifeSeconds is 3600. The scope value is CSV format with the default value being no/empty scope. For example: `principalClaimName=azp principal=admin scopeClaimName=roles scope=role1,role2 lifeSeconds=600`. In addition, SASL extensions can be communicated to the broker via `extension_NAME=value`. For example: `principal=admin extension_traceId=123` <br>*Type: string*
94102
enable.sasl.oauthbearer.unsecure.jwt | * | true, false | false | low | Enable the builtin unsecure JWT OAUTHBEARER token handler if no oauthbearer_refresh_cb has been set. This builtin handler should only be used for development or testing, and not in production. <br>*Type: boolean*
95103
oauthbearer_token_refresh_cb | * | | | low | SASL/OAUTHBEARER token refresh callback (set with rd_kafka_conf_set_oauthbearer_token_refresh_cb(), triggered by rd_kafka_poll(), et.al. This callback will be triggered when it is time to refresh the client's OAUTHBEARER token. Also see `rd_kafka_conf_enable_sasl_queue()`. <br>*Type: see dedicated API*

configure.self

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ void foo (void) {
149149

150150
if [[ $WITH_CURL == y ]]; then
151151
mkl_allvar_set WITH_OAUTHBEARER_OIDC WITH_OAUTHBEARER_OIDC y
152+
mkl_allvar_set WITH_SASL_AWS_MSK_IAM WITH_SASL_AWS_MSK_IAM y
152153
fi
153154
fi
154155

src/Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ endif
1212
SRCS_$(WITH_SASL_CYRUS) += rdkafka_sasl_cyrus.c
1313
SRCS_$(WITH_SASL_SCRAM) += rdkafka_sasl_scram.c
1414
SRCS_$(WITH_SASL_OAUTHBEARER) += rdkafka_sasl_oauthbearer.c
15+
SRCS_$(WITH_SASL_AWS_MSK_IAM) += rdkafka_sasl_aws_msk_iam.c rdkafka_aws.c
1516
SRCS_$(WITH_SNAPPY) += snappy.c
1617
SRCS_$(WITH_ZLIB) += rdgz.c
1718
SRCS_$(WITH_ZSTD) += rdkafka_zstd.c
@@ -55,7 +56,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
5556
rdkafka_txnmgr.c rdkafka_coord.c \
5657
rdvarint.c rdbuf.c rdmap.c rdunittest.c \
5758
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c \
58-
rdkafka_error.c \
59+
rdkafka_error.c rdstringbuilder.c \
5960
$(SRCS_y)
6061

6162
HDRS= rdkafka.h rdkafka_mock.h

src/rdkafka.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@
5858
#if WITH_OAUTHBEARER_OIDC
5959
#include "rdkafka_sasl_oauthbearer_oidc.h"
6060
#endif
61+
#if WITH_SASL_AWS_MSK_IAM
62+
#include "rdkafka_sasl_aws_msk_iam.h"
63+
#endif
6164
#if WITH_SSL
6265
#include "rdkafka_ssl.h"
6366
#endif
@@ -2276,6 +2279,10 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
22762279
rd_kafka_conf_set_oauthbearer_token_refresh_cb(
22772280
&rk->rk_conf, rd_kafka_oidc_token_refresh_cb);
22782281
#endif
2282+
#if WITH_SASL_AWS_MSK_IAM
2283+
rk->rk_conf.enabled_events |=
2284+
RD_KAFKA_EVENT_AWS_MSK_IAM_CREDENTIAL_REFRESH;
2285+
#endif
22792286

22802287
rk->rk_controllerid = -1;
22812288

src/rdkafka.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2157,6 +2157,7 @@ RD_EXPORT
21572157
void rd_kafka_conf_enable_sasl_queue(rd_kafka_conf_t *conf, int enable);
21582158

21592159

2160+
21602161
/**
21612162
* @brief Set socket callback.
21622163
*
@@ -5141,6 +5142,8 @@ typedef int rd_kafka_event_type_t;
51415142
#define RD_KAFKA_EVENT_CREATEACLS_RESULT 0x400 /**< CreateAcls_result_t */
51425143
#define RD_KAFKA_EVENT_DESCRIBEACLS_RESULT 0x800 /**< DescribeAcls_result_t */
51435144
#define RD_KAFKA_EVENT_DELETEACLS_RESULT 0x1000 /**< DeleteAcls_result_t */
5145+
#define RD_KAFKA_EVENT_AWS_MSK_IAM_CREDENTIAL_REFRESH 0x2000 /**< SASL/AWS_MSK_IAM credentials need to be refreshed */
5146+
51445147

51455148
/**
51465149
* @returns the event type for the given event.

0 commit comments

Comments
 (0)