Skip to content

Commit 939e606

Browse files
Jithendar12AbdulR3hman
authored andcommitted
Add Support for OAuth in athena-synapse Connector
1 parent 007410b commit 939e606

File tree

8 files changed

+552
-13
lines changed

8 files changed

+552
-13
lines changed

athena-synapse/athena-synapse-connection.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ Resources:
102102
Statement:
103103
- Action:
104104
- secretsmanager:GetSecretValue
105+
- secretsmanager:PutSecretValue
105106
Effect: Allow
106107
Resource: !Sub 'arn:${AWS::Partition}:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:${SecretName}*'
107108
- Action:

athena-synapse/athena-synapse-package.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ Resources:
120120
Statement:
121121
- Action:
122122
- secretsmanager:GetSecretValue
123+
- secretsmanager:PutSecretValue
123124
Effect: Allow
124125
Resource: !Sub 'arn:${AWS::Partition}:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:${SecretNamePrefix}*'
125126
- Action:

athena-synapse/athena-synapse.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ Resources:
124124
Statement:
125125
- Action:
126126
- secretsmanager:GetSecretValue
127+
- secretsmanager:PutSecretValue
127128
Effect: Allow
128129
Resource: !Sub 'arn:${AWS::Partition}:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:${SecretNamePrefix}*'
129130
- Action:
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*-
2+
* #%L
3+
* athena-synapse
4+
* %%
5+
* Copyright (C) 2019 - 2025 Amazon Web Services
6+
* %%
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
* #L%
19+
*/
20+
package com.amazonaws.athena.connectors.synapse;
21+
22+
import com.amazonaws.athena.connector.credentials.CredentialsProvider;
23+
import com.amazonaws.athena.connector.credentials.DefaultCredentials;
24+
import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
25+
import com.amazonaws.athena.connector.lambda.security.CachableSecretsManager;
26+
import com.fasterxml.jackson.databind.JsonNode;
27+
import com.fasterxml.jackson.databind.ObjectMapper;
28+
import com.fasterxml.jackson.databind.node.ObjectNode;
29+
import software.amazon.awssdk.services.glue.model.ErrorDetails;
30+
import software.amazon.awssdk.services.glue.model.FederationSourceErrorCode;
31+
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;
32+
import software.amazon.awssdk.services.secretsmanager.model.PutSecretValueRequest;
33+
34+
import java.net.URI;
35+
import java.net.http.HttpClient;
36+
import java.net.http.HttpRequest;
37+
import java.net.http.HttpResponse;
38+
import java.time.Duration;
39+
import java.time.Instant;
40+
import java.util.HashMap;
41+
import java.util.Map;
42+
43+
public class SynapseCredentialsProvider implements CredentialsProvider
44+
{
45+
private static final String ACCESS_TOKEN = "access_token";
46+
private static final String FETCHED_AT = "fetched_at";
47+
private static final String EXPIRES_IN = "expires_in";
48+
49+
private static final String CLIENT_ID = "client_id";
50+
private static final String CLIENT_SECRET = "client_secret";
51+
private static final String TENANT_ID = "tenant_id";
52+
53+
private static final String USER = "user";
54+
private static final String PASSWORD = "password";
55+
private static final String USERNAME = "username";
56+
57+
private static final String GRANT_TYPE = "client_credentials";
58+
private static final String SCOPE = "https://sql.azuresynapse.net/.default";
59+
private static final String TOKEN_ENDPOINT_TEMPLATE = "https://login.microsoftonline.com/%s/oauth2/v2.0/token";
60+
private static final long TOKEN_REFRESH_BUFFER_SECONDS = 300;
61+
62+
private final String secretName;
63+
private final CachableSecretsManager secretsManager;
64+
private final ObjectMapper objectMapper;
65+
66+
public SynapseCredentialsProvider(String secretName)
67+
{
68+
this.secretName = secretName;
69+
this.secretsManager = new CachableSecretsManager(SecretsManagerClient.create());
70+
this.objectMapper = new ObjectMapper();
71+
}
72+
73+
@Override
74+
public DefaultCredentials getCredential()
75+
{
76+
Map<String, String> credentialMap = getCredentialMap();
77+
return new DefaultCredentials(
78+
credentialMap.get(USER),
79+
credentialMap.get(PASSWORD)
80+
);
81+
}
82+
83+
@Override
84+
public Map<String, String> getCredentialMap()
85+
{
86+
try {
87+
String secretString = secretsManager.getSecret(secretName);
88+
Map<String, String> oauthConfig = objectMapper.readValue(secretString, Map.class);
89+
90+
Map<String, String> credentialMap = new HashMap<>();
91+
92+
if (!isOAuthConfigured(oauthConfig)) {
93+
credentialMap.put(USER, oauthConfig.get(USERNAME));
94+
credentialMap.put(PASSWORD, oauthConfig.get(PASSWORD));
95+
}
96+
97+
return credentialMap;
98+
}
99+
catch (Exception e) {
100+
throw new AthenaConnectorException(
101+
"Failed to retrieve Synapse credentials",
102+
ErrorDetails.builder()
103+
.errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString())
104+
.build()
105+
);
106+
}
107+
}
108+
109+
public String getOAuthAccessToken()
110+
{
111+
try {
112+
String secretValue = secretsManager.getSecret(secretName);
113+
Map<String, String> oauthConfig = objectMapper.readValue(secretValue, Map.class);
114+
115+
if (isOAuthConfigured(oauthConfig)) {
116+
return fetchAccessToken(oauthConfig);
117+
}
118+
return null;
119+
}
120+
catch (Exception e) {
121+
throw new AthenaConnectorException(
122+
"Failed to get OAuth access token",
123+
ErrorDetails.builder()
124+
.errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString())
125+
.build()
126+
);
127+
}
128+
}
129+
130+
private boolean isOAuthConfigured(Map<String, String> oauthConfig)
131+
{
132+
return oauthConfig.containsKey(CLIENT_ID) &&
133+
!oauthConfig.get(CLIENT_ID).isEmpty() &&
134+
oauthConfig.containsKey(CLIENT_SECRET) &&
135+
!oauthConfig.get(CLIENT_SECRET).isEmpty() &&
136+
oauthConfig.containsKey(TENANT_ID) &&
137+
!oauthConfig.get(TENANT_ID).isEmpty();
138+
}
139+
140+
private String fetchAccessToken(Map<String, String> oauthConfig) throws Exception
141+
{
142+
String accessToken = oauthConfig.get(ACCESS_TOKEN);
143+
144+
if (accessToken != null &&
145+
oauthConfig.containsKey(FETCHED_AT) &&
146+
oauthConfig.containsKey(EXPIRES_IN)) {
147+
long fetchedAt = Long.parseLong(oauthConfig.get(FETCHED_AT));
148+
long expiresIn = Long.parseLong(oauthConfig.get(EXPIRES_IN));
149+
long now = Instant.now().getEpochSecond();
150+
151+
if (now < (fetchedAt + expiresIn - TOKEN_REFRESH_BUFFER_SECONDS)) {
152+
return accessToken;
153+
}
154+
}
155+
156+
return fetchAndStoreNewToken(oauthConfig);
157+
}
158+
159+
private String fetchAndStoreNewToken(Map<String, String> oauthConfig) throws Exception
160+
{
161+
String clientId = oauthConfig.get(CLIENT_ID);
162+
String clientSecret = oauthConfig.get(CLIENT_SECRET);
163+
String tenantId = oauthConfig.get(TENANT_ID);
164+
165+
String tokenUrl = String.format(TOKEN_ENDPOINT_TEMPLATE, tenantId);
166+
String requestBody = String.format(
167+
"grant_type=%s&scope=%s&client_id=%s&client_secret=%s",
168+
GRANT_TYPE, SCOPE, clientId, clientSecret
169+
);
170+
171+
HttpClient client = HttpClient.newHttpClient();
172+
HttpRequest request = HttpRequest.newBuilder()
173+
.uri(URI.create(tokenUrl))
174+
.header("Content-Type", "application/x-www-form-urlencoded")
175+
.POST(HttpRequest.BodyPublishers.ofString(requestBody))
176+
.timeout(Duration.ofSeconds(30))
177+
.build();
178+
179+
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
180+
181+
if (response.statusCode() != 200) {
182+
throw new AthenaConnectorException(
183+
"Failed to fetch access token",
184+
ErrorDetails.builder()
185+
.errorCode(FederationSourceErrorCode.INTERNAL_SERVICE_EXCEPTION.toString())
186+
.build()
187+
);
188+
}
189+
190+
JsonNode tokenResponse = objectMapper.readTree(response.body());
191+
String accessToken = tokenResponse.get(ACCESS_TOKEN).asText();
192+
long expiresIn = tokenResponse.get(EXPIRES_IN).asLong();
193+
long fetchedAt = Instant.now().getEpochSecond();
194+
195+
oauthConfig.put(ACCESS_TOKEN, accessToken);
196+
oauthConfig.put(EXPIRES_IN, String.valueOf(expiresIn));
197+
oauthConfig.put(FETCHED_AT, String.valueOf(fetchedAt));
198+
199+
ObjectNode updatedSecretJson = objectMapper.createObjectNode();
200+
for (Map.Entry<String, String> entry : oauthConfig.entrySet()) {
201+
updatedSecretJson.put(entry.getKey(), entry.getValue());
202+
}
203+
204+
secretsManager.getSecretsManager().putSecretValue(PutSecretValueRequest.builder()
205+
.secretId(secretName)
206+
.secretString(updatedSecretJson.toString())
207+
.build());
208+
209+
return accessToken;
210+
}
211+
}

athena-synapse/src/main/java/com/amazonaws/athena/connectors/synapse/SynapseJdbcConnectionFactory.java

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package com.amazonaws.athena.connectors.synapse;
2121

2222
import com.amazonaws.athena.connector.credentials.CredentialsProvider;
23+
import com.amazonaws.athena.connector.credentials.DefaultCredentials;
2324
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig;
2425
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo;
2526
import com.amazonaws.athena.connectors.jdbc.connection.GenericJdbcConnectionFactory;
@@ -59,25 +60,44 @@ public Connection getConnection(final CredentialsProvider credentialsProvider)
5960
{
6061
try {
6162
final String derivedJdbcString;
62-
if (null != credentialsProvider) {
63+
final Properties connectionProps = new Properties();
64+
connectionProps.putAll(this.jdbcProperties);
65+
66+
if (credentialsProvider != null) {
6367
Matcher secretMatcher = SECRET_NAME_PATTERN.matcher(databaseConnectionConfig.getJdbcConnectionString());
6468
final String secretReplacement;
65-
if (databaseConnectionConfig.getJdbcConnectionString().contains("authentication=ActiveDirectoryServicePrincipal")) {
66-
// Set AADSecurePrincipal credentials
69+
70+
String connectionString = databaseConnectionConfig.getJdbcConnectionString();
71+
72+
if (connectionString.contains("authentication=ActiveDirectoryServicePrincipal")) {
73+
// AAD Service Principal credentials
74+
DefaultCredentials credentials = credentialsProvider.getCredential();
6775
secretReplacement = String.format(
68-
"%s;%s",
69-
"AADSecurePrincipalId=" + credentialsProvider.getCredential().getUser(),
70-
"AADSecurePrincipalSecret=" + credentialsProvider.getCredential().getPassword()
76+
"%s;%s",
77+
"AADSecurePrincipalId=" + credentials.getUser(),
78+
"AADSecurePrincipalSecret=" + credentials.getPassword()
7179
);
7280
}
7381
else {
74-
// replace aws secret value with credentials and change username as user
75-
secretReplacement = String.format(
76-
"%s;%s",
77-
"user=" + credentialsProvider.getCredential().getUser(),
78-
"password=" + credentialsProvider.getCredential().getPassword()
79-
);
82+
SynapseCredentialsProvider synapseProvider = (SynapseCredentialsProvider) credentialsProvider;
83+
String accessToken = synapseProvider.getOAuthAccessToken();
84+
85+
if (accessToken != null) {
86+
// OAuth token
87+
connectionProps.setProperty("accessToken", accessToken);
88+
secretReplacement = "";
89+
}
90+
else {
91+
// Fallback to username/password and change username as user
92+
DefaultCredentials credentials = synapseProvider.getCredential();
93+
secretReplacement = String.format(
94+
"%s;%s",
95+
"user=" + credentials.getUser(),
96+
"password=" + credentials.getPassword()
97+
);
98+
}
8099
}
100+
81101
derivedJdbcString = secretMatcher.replaceAll(Matcher.quoteReplacement(secretReplacement));
82102
}
83103
else {
@@ -86,7 +106,7 @@ public Connection getConnection(final CredentialsProvider credentialsProvider)
86106
// register driver
87107
Class.forName(databaseConnectionInfo.getDriverClassName()).newInstance();
88108
// create connection
89-
return DriverManager.getConnection(derivedJdbcString, this.jdbcProperties);
109+
return DriverManager.getConnection(derivedJdbcString, connectionProps);
90110
}
91111
catch (SQLException sqlException) {
92112
throw new RuntimeException(sqlException.getErrorCode() + ": " + sqlException);

athena-synapse/src/main/java/com/amazonaws/athena/connectors/synapse/SynapseMetadataHandler.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
*/
2020
package com.amazonaws.athena.connectors.synapse;
2121

22+
import com.amazonaws.athena.connector.credentials.CredentialsProvider;
2223
import com.amazonaws.athena.connector.lambda.QueryStatusChecker;
2324
import com.amazonaws.athena.connector.lambda.data.Block;
2425
import com.amazonaws.athena.connector.lambda.data.BlockAllocator;
@@ -55,6 +56,7 @@
5556
import org.apache.arrow.vector.types.Types;
5657
import org.apache.arrow.vector.types.pojo.ArrowType;
5758
import org.apache.arrow.vector.types.pojo.Schema;
59+
import org.apache.commons.lang3.StringUtils;
5860
import org.slf4j.Logger;
5961
import org.slf4j.LoggerFactory;
6062
import org.stringtemplate.v4.ST;
@@ -480,4 +482,15 @@ private SchemaBuilder doDataTypeConversionForNonCompatible(Connection jdbcConnec
480482
}
481483
return schemaBuilder;
482484
}
485+
486+
@Override
487+
protected CredentialsProvider getCredentialProvider()
488+
{
489+
final String secretName = getDatabaseConnectionConfig().getSecret();
490+
if (StringUtils.isNotBlank(secretName)) {
491+
return new SynapseCredentialsProvider(secretName);
492+
}
493+
494+
return null;
495+
}
483496
}

athena-synapse/src/main/java/com/amazonaws/athena/connectors/synapse/SynapseRecordHandler.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
*/
2020
package com.amazonaws.athena.connectors.synapse;
2121

22+
import com.amazonaws.athena.connector.credentials.CredentialsProvider;
2223
import com.amazonaws.athena.connector.lambda.QueryStatusChecker;
2324
import com.amazonaws.athena.connector.lambda.data.Block;
2425
import com.amazonaws.athena.connector.lambda.data.BlockSpiller;
@@ -37,6 +38,7 @@
3738
import org.apache.arrow.vector.types.pojo.ArrowType;
3839
import org.apache.arrow.vector.types.pojo.Field;
3940
import org.apache.arrow.vector.types.pojo.Schema;
41+
import org.apache.commons.lang3.StringUtils;
4042
import org.apache.commons.lang3.Validate;
4143
import org.slf4j.Logger;
4244
import org.slf4j.LoggerFactory;
@@ -140,4 +142,15 @@ public void readWithConstraint(BlockSpiller blockSpiller, ReadRecordsRequest rea
140142
}
141143
}
142144
}
145+
146+
@Override
147+
protected CredentialsProvider getCredentialProvider()
148+
{
149+
final String secretName = getDatabaseConnectionConfig().getSecret();
150+
if (StringUtils.isNotBlank(secretName)) {
151+
return new SynapseCredentialsProvider(secretName);
152+
}
153+
154+
return null;
155+
}
143156
}

0 commit comments

Comments
 (0)