Skip to content

Commit 2d6494b

Browse files
authored
Extend Avro Converter to support Cross-Account schema registry access (#376)
* Ability to assume role for cross account schema access in avro-kafkaconnect-converter * Update readme with cross account details * Simplify logic by dynamically assuming role in existing converter class if arn is provided * Review changes
1 parent c6a9113 commit 2d6494b

File tree

4 files changed

+156
-9
lines changed

4 files changed

+156
-9
lines changed

README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,30 @@ repository for the latest support: [Avro SerializationSchema and Deserialization
635635
properties);
636636
DataStream<GenericRecord> stream = env.addSource(consumer);
637637
```
638+
639+
## Cross-Account Avro Converter Support
640+
641+
The `AWSKafkaAvroConverter` Avro converter is able to assume an IAM role in a different AWS account before accessing Glue Schema Registry. You can configure the role ARN and an optional session name.
642+
643+
If `assumeRoleArn` is not provided, the converter will fallback to using the default credentials associated to the host.
644+
645+
### Connector configuration
646+
647+
Include these properties in your Kafka Connect worker or connector config:
648+
649+
```properties
650+
# Define converter
651+
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
652+
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
653+
654+
# Specify cross-account role arn
655+
key.converter.assumeRoleArn="arn:aws:iam::123456789012:role/my-role"
656+
value.converter.assumeRoleArn="arn:aws:iam::123456789012:role/my-role"
657+
658+
# Override default session name (optional; default is "kafka-connect-session")
659+
key.converter.assumeRoleSessionName=my-custom-session
660+
value.converter.assumeRoleSessionName=my-custom-session
661+
```
638662
639663
## Security issue notifications
640664
If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/). Please do **not** create a public github issue.

avro-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/AWSKafkaAvroConverter.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,20 @@
2121
import com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.AvroData;
2222
import com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.AvroDataConfig;
2323
import com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer;
24-
24+
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
25+
import com.google.common.annotations.VisibleForTesting;
2526
import lombok.Data;
2627
import lombok.extern.slf4j.Slf4j;
2728
import org.apache.kafka.common.errors.SerializationException;
2829
import org.apache.kafka.connect.data.Schema;
2930
import org.apache.kafka.connect.data.SchemaAndValue;
3031
import org.apache.kafka.connect.errors.DataException;
3132
import org.apache.kafka.connect.storage.Converter;
33+
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
34+
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
35+
import software.amazon.awssdk.regions.Region;
36+
import software.amazon.awssdk.services.sts.StsClient;
37+
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
3238

3339
import java.util.Map;
3440

@@ -75,6 +81,21 @@ public void configure(Map<String, ?> configs, boolean isKey) {
7581
this.isKey = isKey;
7682
new AWSKafkaAvroConverterConfig(configs);
7783

84+
//TODO: add this feature to all other converters
85+
String roleToAssume = (String) configs.get(AWSSchemaRegistryConstants.ASSUME_ROLE_ARN);
86+
if (roleToAssume != null && !roleToAssume.isEmpty()) {
87+
String sessionName = configs.get(AWSSchemaRegistryConstants.ASSUME_ROLE_SESSION_NAME) != null
88+
? configs.get(AWSSchemaRegistryConstants.ASSUME_ROLE_SESSION_NAME).toString()
89+
: "kafka-connect-session";
90+
91+
String region = configs.get(AWSSchemaRegistryConstants.AWS_REGION).toString();
92+
93+
AwsCredentialsProvider credentialsProvider = getCredentialsProvider(roleToAssume, sessionName, region);
94+
95+
deserializer = new AWSKafkaAvroDeserializer(credentialsProvider, configs);
96+
serializer = new AWSKafkaAvroSerializer(credentialsProvider, configs);
97+
}
98+
7899
serializer.configure(configs, this.isKey);
79100
deserializer.configure(configs, this.isKey);
80101

@@ -123,4 +144,19 @@ public SchemaAndValue toConnectData(String topic, byte[] value) {
123144

124145
return avroData.toConnectData(avroSchema, deserialized);
125146
}
147+
148+
@VisibleForTesting
149+
protected AwsCredentialsProvider getCredentialsProvider(String roleArn, String sessionName, String region) {
150+
UrlConnectionHttpClient.Builder urlConnectionHttpClientBuilder = UrlConnectionHttpClient.builder();
151+
StsClient stsClient = StsClient.builder()
152+
.httpClient(urlConnectionHttpClientBuilder.build())
153+
.region(Region.of(region))
154+
.build();
155+
return StsAssumeRoleCredentialsProvider.builder()
156+
.refreshRequest(assumeRoleRequest -> assumeRoleRequest
157+
.roleArn(roleArn)
158+
.roleSessionName(sessionName))
159+
.stsClient(stsClient)
160+
.build();
161+
}
126162
}

avro-kafkaconnect-converter/src/test/java/com/amazonaws/services/schemaregistry/kafkaconnect/AWSKafkaAvroConverterTest.java

Lines changed: 85 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@
4646
import java.util.Map;
4747
import java.util.UUID;
4848

49-
import static org.junit.jupiter.api.Assertions.assertEquals;
50-
import static org.junit.jupiter.api.Assertions.assertNotNull;
51-
import static org.junit.jupiter.api.Assertions.assertThrows;
52-
import static org.mockito.ArgumentMatchers.anyMap;
53-
import static org.mockito.ArgumentMatchers.eq;
54-
import static org.mockito.Mockito.mock;
55-
import static org.mockito.Mockito.when;
49+
import static com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants.ASSUME_ROLE_ARN;
50+
import static com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants.ASSUME_ROLE_SESSION_NAME;
51+
import static org.junit.Assert.assertFalse;
52+
import static org.junit.Assert.assertTrue;
53+
import static org.junit.jupiter.api.Assertions.*;
54+
import static org.mockito.ArgumentMatchers.*;
55+
import static org.mockito.Mockito.*;
5656

5757
/**
5858
* Unit tests for testing AWSKafkaAvroConverter class.
@@ -78,6 +78,8 @@ public class AWSKafkaAvroConverterTest {
7878
private static final UUID schemaVersionIdForTesting = UUID.fromString("b7b4a7f0-9c96-4e4a-a687-fb5de9ef0c63");
7979
private static final byte[] genericBytes = new byte[] {3, 0, -73, -76, -89, -16, -100, -106, 78, 74, -90, -121, -5,
8080
93, -23, -17, 12, 99, 10, 115, 97, 110, 115, 97, -58, 1, 6, 114, 101, 100};
81+
private static final String ROLE_ARN = "arn:aws:iam::123456789012:role/my-role";
82+
private static final String REGION = "us-west-2";
8183

8284
@BeforeEach
8385
public void setUp() {
@@ -149,6 +151,81 @@ public void testConverter_toConnectData_throwsException() {
149151
assertThrows(DataException.class, () -> converter.toConnectData(testTopic, serializedData));
150152
}
151153

154+
/**
155+
* Test AWSKafkaAvroConverter when value is null.
156+
*/
157+
@Test
158+
void testConverter_toConnectData_NullValue() {
159+
converter = spy(new AWSKafkaAvroConverter());
160+
assertEquals(SchemaAndValue.NULL, converter.toConnectData(testTopic, null));
161+
}
162+
163+
/**
164+
* Test AWSKafkaAvroConverter with assume role.
165+
*/
166+
@Test
167+
void testConverter_configure_invokeAssumeRoleWithCustomSession() {
168+
configs.put(ASSUME_ROLE_ARN, ROLE_ARN);
169+
configs.put(ASSUME_ROLE_SESSION_NAME, "my-session");
170+
171+
converter = spy(new AWSKafkaAvroConverter());
172+
doReturn(mockCredProvider)
173+
.when(converter)
174+
.getCredentialsProvider(anyString(), anyString(), anyString());
175+
176+
converter.configure(configs, true);
177+
178+
verify(converter).getCredentialsProvider(ROLE_ARN, "my-session", REGION);
179+
assertTrue(converter.isKey());
180+
assertNotNull(converter.getSerializer());
181+
assertNotNull(converter.getDeserializer());
182+
assertNotNull(converter.getAvroData());
183+
}
184+
185+
/**
186+
* Test AWSKafkaAvroConverter assume role, default session name.
187+
*/
188+
@Test
189+
void testConverter_configure_defaultSessionNameForAssumeRole() {
190+
configs.put(ASSUME_ROLE_ARN, ROLE_ARN);
191+
192+
converter = spy(new AWSKafkaAvroConverter());
193+
doReturn(mockCredProvider)
194+
.when(converter)
195+
.getCredentialsProvider(anyString(), anyString(), anyString());
196+
197+
converter.configure(configs, false);
198+
199+
verify(converter).getCredentialsProvider(ROLE_ARN, "kafka-connect-session", REGION);
200+
assertFalse(converter.isKey());
201+
}
202+
203+
/**
204+
* Test AWSKafkaAvroConverter assume role empty.
205+
*/
206+
@Test
207+
void testConverter_configure_noAssumeRoleIfArnIsEmpty() {
208+
configs.put(ASSUME_ROLE_ARN, "");
209+
210+
converter = spy(new AWSKafkaAvroConverter());
211+
converter.configure(configs, false);
212+
213+
verify(converter, never())
214+
.getCredentialsProvider(anyString(), anyString(), anyString());
215+
}
216+
217+
/**
218+
* Test AWSKafkaAvroConverter assume role null.
219+
*/
220+
@Test
221+
void testConverter_configure_noAssumeRoleIfArnIsNotProvided() {
222+
converter = spy(new AWSKafkaAvroConverter());
223+
converter.configure(getProperties(), false);
224+
225+
verify(converter, never())
226+
.getCredentialsProvider(anyString(), anyString(), anyString());
227+
}
228+
152229
/**
153230
* To create a AWSKafkaAvroSerializer instance with mocked parameters.
154231
*
@@ -226,7 +303,7 @@ private Struct createStructRecord() {
226303
private Map<String, Object> getProperties() {
227304
Map<String, Object> props = new HashMap<>();
228305

229-
props.put(AWSSchemaRegistryConstants.AWS_REGION, "us-west-2");
306+
props.put(AWSSchemaRegistryConstants.AWS_REGION, REGION);
230307
props.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, "https://test");
231308
props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
232309
props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

common/src/main/java/com/amazonaws/services/schemaregistry/utils/AWSSchemaRegistryConstants.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,16 @@ public final class AWSSchemaRegistryConstants {
172172
*/
173173
public static final String USER_AGENT_APP = "userAgentApp";
174174

175+
/**
176+
* IAM Role ARN to assume for accessing the registry
177+
*/
178+
public static final String ASSUME_ROLE_ARN = "assumeRoleArn";
179+
180+
/**
181+
* IAM Role session name for accessing the registry
182+
*/
183+
public static final String ASSUME_ROLE_SESSION_NAME = "assumeRoleSessionName";
184+
175185
/**
176186
* Private constructor to avoid initialization of the class.
177187
*/

0 commit comments

Comments
 (0)