Skip to content

Commit dd2015a

Browse files
Snowflake Connector Performance Improvement (awslabs#2665)
Co-authored-by: Samarth <[email protected]>
1 parent 7b4c686 commit dd2015a

20 files changed

+2331
-989
lines changed

athena-snowflake/athena-snowflake-connection.yaml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ Parameters:
4242
Description: "(Optional) A custom role to be used by the Connector lambda"
4343
Type: String
4444
Default: ""
45+
EnableS3Export:
46+
Description: '(Optional) Enable S3 export functionality for data transfer. Set to true to use S3 export path, false for direct query path.'
47+
Type: String
48+
Default: 'false'
49+
AllowedValues:
50+
- 'true'
51+
- 'false'
4552

4653
Conditions:
4754
HasSecurityGroups: !Not [ !Equals [ !Join ["", !Ref SecurityGroupIds], "" ] ]
@@ -59,6 +66,7 @@ Resources:
5966
Environment:
6067
Variables:
6168
glue_connection: !Ref GlueConnection
69+
SNOWFLAKE_ENABLE_S3_EXPORT: !Ref EnableS3Export
6270
FunctionName: !Ref LambdaFunctionName
6371
PackageType: "Image"
6472
ImageUri: !Sub
@@ -127,14 +135,15 @@ Resources:
127135
Resource: '*'
128136
- Action:
129137
- s3:GetObject
138+
- s3:GetObjectVersion
130139
- s3:ListBucket
131140
- s3:GetBucketLocation
132-
- s3:GetObjectVersion
133141
- s3:PutObject
134142
- s3:PutObjectAcl
135143
- s3:GetLifecycleConfiguration
136144
- s3:PutLifecycleConfiguration
137145
- s3:DeleteObject
146+
- s3:DeleteObjectVersion
138147
Effect: Allow
139148
Resource:
140149
- Fn::Sub:
@@ -151,6 +160,10 @@ Resources:
151160
Resource:
152161
- !Sub 'arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:connection/${GlueConnection}'
153162
- !Sub 'arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:catalog'
163+
- Action:
164+
- lambda:GetFunction
165+
Effect: Allow
166+
Resource: !Sub 'arn:${AWS::Partition}:lambda:${AWS::Region}:${AWS::AccountId}:function:${LambdaFunctionName}'
154167

155168
FunctionKmsPolicy:
156169
Condition: CreateKmsPolicy

athena-snowflake/athena-snowflake.yaml

Lines changed: 94 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,24 @@ Parameters:
5454
Description: "(Optional) An IAM policy ARN to use as the PermissionsBoundary for the created Lambda function's execution role"
5555
Default: ''
5656
Type: String
57+
LambdaRoleArn:
58+
Description: "(Optional) A custom role to be used by the Connector lambda"
59+
Type: String
60+
Default: ""
61+
EnableS3Export:
62+
Description: '(Optional) Enable S3 export functionality for data transfer. Set to true to use S3 export path, false for direct query path.'
63+
Type: String
64+
Default: 'false'
65+
AllowedValues:
66+
- 'true'
67+
- 'false'
5768
Conditions:
5869
HasPermissionsBoundary: !Not [ !Equals [ !Ref PermissionsBoundaryARN, "" ] ]
5970
HasSecurityGroups: !Not [ !Equals [ !Join ["", !Ref SecurityGroupIds], "" ] ]
6071
HasSubnets: !Not [ !Equals [ !Join ["", !Ref SubnetIds], "" ] ]
6172
IsRegionBAH: !Equals [!Ref "AWS::Region", "me-south-1"]
6273
IsRegionHKG: !Equals [!Ref "AWS::Region", "ap-east-1"]
74+
NotHasLambdaRole: !Equals [ !Ref LambdaRoleArn, "" ]
6375
Resources:
6476
JdbcConnectorConfig:
6577
Type: 'AWS::Serverless::Function'
@@ -70,6 +82,7 @@ Resources:
7082
spill_bucket: !Ref SpillBucket
7183
spill_prefix: !Ref SpillPrefix
7284
default: !Ref DefaultConnectionString
85+
SNOWFLAKE_ENABLE_S3_EXPORT: !Ref EnableS3Export
7386
FunctionName: !Ref LambdaFunctionName
7487
PackageType: "Image"
7588
ImageUri: !Sub
@@ -80,40 +93,87 @@ Resources:
8093
Description: "Enables Amazon Athena to communicate with Snowflake using JDBC"
8194
Timeout: !Ref LambdaTimeout
8295
MemorySize: !Ref LambdaMemory
96+
Role: !If [ NotHasLambdaRole, !GetAtt FunctionRole.Arn, !Ref LambdaRoleArn ]
97+
VpcConfig:
98+
SecurityGroupIds: !If [ HasSecurityGroups, !Ref SecurityGroupIds, !Ref "AWS::NoValue" ]
99+
SubnetIds: !If [ HasSubnets, !Ref SubnetIds, !Ref "AWS::NoValue" ]
100+
101+
FunctionRole:
102+
Condition: NotHasLambdaRole
103+
Type: AWS::IAM::Role
104+
Properties:
83105
PermissionsBoundary: !If [ HasPermissionsBoundary, !Ref PermissionsBoundaryARN, !Ref "AWS::NoValue" ]
84-
Policies:
85-
- Statement:
86-
- Action:
87-
- secretsmanager:GetSecretValue
88-
- secretsmanager:PutSecretValue
89-
Effect: Allow
90-
Resource: !Sub 'arn:${AWS::Partition}:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:${SecretNamePrefix}*'
91-
Version: '2012-10-17'
92-
- Statement:
93-
- Action:
94-
- logs:CreateLogGroup
95-
Effect: Allow
96-
Resource: !Sub 'arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:*'
97-
Version: '2012-10-17'
98-
- Statement:
99-
- Action:
100-
- logs:CreateLogStream
101-
- logs:PutLogEvents
102-
Effect: Allow
103-
Resource: !Sub 'arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/${LambdaFunctionName}:*'
104-
Version: '2012-10-17'
105-
- Statement:
106-
- Action:
107-
- athena:GetQueryExecution
108-
Effect: Allow
109-
Resource: '*'
110-
Version: '2012-10-17'
106+
ManagedPolicyArns:
107+
- !Sub "arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
108+
AssumeRolePolicyDocument:
109+
Version: 2012-10-17
110+
Statement:
111+
- Effect: Allow
112+
Principal:
113+
Service:
114+
- lambda.amazonaws.com
115+
Action:
116+
- "sts:AssumeRole"
117+
118+
FunctionExecutionPolicy:
119+
Condition: NotHasLambdaRole
120+
Type: "AWS::IAM::Policy"
121+
Properties:
122+
PolicyName: FunctionExecutionPolicy
123+
PolicyDocument:
124+
Version: 2012-10-17
125+
Statement:
126+
- Action:
127+
- secretsmanager:GetSecretValue
128+
- secretsmanager:PutSecretValue
129+
Effect: Allow
130+
Resource: !Sub 'arn:${AWS::Partition}:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:${SecretNamePrefix}*'
131+
- Action:
132+
- logs:CreateLogGroup
133+
Effect: Allow
134+
Resource: !Sub 'arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:*'
135+
- Action:
136+
- logs:CreateLogStream
137+
- logs:PutLogEvents
138+
Effect: Allow
139+
Resource: !Sub 'arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/${LambdaFunctionName}:*'
140+
- Action:
141+
- lambda:GetFunction
142+
Effect: Allow
143+
Resource: !Sub 'arn:${AWS::Partition}:lambda:${AWS::Region}:${AWS::AccountId}:function:${LambdaFunctionName}'
144+
- Action:
145+
- athena:GetQueryExecution
146+
Effect: Allow
147+
Resource: '*'
148+
- Action:
149+
- ec2:CreateNetworkInterface
150+
- ec2:DeleteNetworkInterface
151+
- ec2:DescribeNetworkInterfaces
152+
- ec2:DetachNetworkInterface
153+
Effect: Allow
154+
Resource: '*'
111155
#S3CrudPolicy allows our connector to spill large responses to S3. You can optionally replace this pre-made policy
112156
#with one that is more restrictive and can only 'put' but not read,delete, or overwrite files.
113-
- S3CrudPolicy:
114-
BucketName: !Ref SpillBucket
115-
#VPCAccessPolicy allows our connector to run in a VPC so that it can access your data source.
116-
- VPCAccessPolicy: {}
117-
VpcConfig:
118-
SecurityGroupIds: !If [ HasSecurityGroups, !Ref SecurityGroupIds, !Ref "AWS::NoValue" ]
119-
SubnetIds: !If [ HasSubnets, !Ref SubnetIds, !Ref "AWS::NoValue" ]
157+
- Action:
158+
- s3:GetObject
159+
- s3:GetObjectVersion
160+
- s3:ListBucket
161+
- s3:GetBucketLocation
162+
- s3:PutObject
163+
- s3:PutObjectAcl
164+
- s3:GetLifecycleConfiguration
165+
- s3:PutLifecycleConfiguration
166+
- s3:DeleteObject
167+
- s3:DeleteObjectVersion
168+
Effect: Allow
169+
Resource:
170+
- Fn::Sub:
171+
- arn:${AWS::Partition}:s3:::${bucketName}
172+
- bucketName:
173+
Ref: SpillBucket
174+
- Fn::Sub:
175+
- arn:${AWS::Partition}:s3:::${bucketName}/*
176+
- bucketName:
177+
Ref: SpillBucket
178+
Roles:
179+
- !Ref FunctionRole

athena-snowflake/pom.xml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,31 @@
2020
<artifactId>athena-jdbc</artifactId>
2121
<version>2022.47.1</version>
2222
</dependency>
23+
<dependency>
24+
<groupId>org.apache.arrow</groupId>
25+
<artifactId>arrow-dataset</artifactId>
26+
<version>${apache.arrow.version}</version>
27+
</dependency>
28+
<dependency>
29+
<groupId>org.apache.arrow</groupId>
30+
<artifactId>arrow-c-data</artifactId>
31+
<version>${apache.arrow.version}</version>
32+
</dependency>
33+
<dependency>
34+
<groupId>org.apache.arrow</groupId>
35+
<artifactId>arrow-memory-core</artifactId>
36+
<version>${apache.arrow.version}</version>
37+
</dependency>
38+
<dependency>
39+
<groupId>net.java.dev.jna</groupId>
40+
<artifactId>jna-platform</artifactId>
41+
<version>5.16.0</version>
42+
</dependency>
43+
<dependency>
44+
<groupId>net.java.dev.jna</groupId>
45+
<artifactId>jna</artifactId>
46+
<version>5.16.0</version>
47+
</dependency>
2348
<dependency>
2449
<groupId>com.amazonaws</groupId>
2550
<artifactId>athena-jdbc</artifactId>

athena-snowflake/src/main/java/com/amazonaws/athena/connectors/snowflake/SnowflakeCompositeHandler.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,26 @@
2424

2525
import com.amazonaws.athena.connector.lambda.handlers.CompositeHandler;
2626

27+
import java.io.IOException;
28+
import java.security.KeyStoreException;
29+
import java.security.NoSuchAlgorithmException;
30+
import java.security.cert.CertificateEncodingException;
31+
32+
import static com.amazonaws.athena.connectors.snowflake.SnowflakeUtils.installCaCertificate;
33+
import static com.amazonaws.athena.connectors.snowflake.SnowflakeUtils.setupNativeEnvironmentVariables;
34+
2735
/**
2836
* Boilerplate composite handler that allows us to use a single Lambda function for both
2937
* Metadata and Data. In this case we just compose {@link SnowflakeMetadataHandler} and {@link SnowflakeRecordHandler}.
3038
*
31-
* Recommend using {@link SnowflakeMuxCompositeHandler} instead.
3239
*/
3340
public class SnowflakeCompositeHandler
3441
extends CompositeHandler
3542
{
36-
public SnowflakeCompositeHandler()
43+
public SnowflakeCompositeHandler() throws CertificateEncodingException, IOException, NoSuchAlgorithmException, KeyStoreException
3744
{
38-
super(new SnowflakeMetadataHandler(new SnowflakeEnvironmentProperties().createEnvironment()), new SnowflakeRecordHandler(new SnowflakeEnvironmentProperties().createEnvironment()));
45+
super(new SnowflakeMetadataHandler(new SnowflakeEnvironmentProperties(System.getenv()).createEnvironment()), new SnowflakeRecordHandler(new SnowflakeEnvironmentProperties(System.getenv()).createEnvironment()));
46+
installCaCertificate();
47+
setupNativeEnvironmentVariables();
3948
}
4049
}

athena-snowflake/src/main/java/com/amazonaws/athena/connectors/snowflake/SnowflakeConstants.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,16 @@ public final class SnowflakeConstants
3535
*/
3636
public static final int SINGLE_SPLIT_LIMIT_COUNT = 10000;
3737
public static final String SNOWFLAKE_QUOTE_CHARACTER = "\"";
38-
38+
/**
39+
* A ssl file location constant to store the SSL certificate
40+
* The file location is fixed at /tmp directory
41+
* to retrieve ssl certificate location
42+
*/
43+
public static final String SSL_CERT_FILE_LOCATION = "SSL_CERT_FILE";
44+
public static final String SSL_CERT_FILE_LOCATION_VALUE = "/tmp/cacert.pem";
45+
public static final String SNOWFLAKE_SPLIT_QUERY_ID = "query_id";
46+
public static final String SNOWFLAKE_SPLIT_EXPORT_BUCKET = "exportBucket";
47+
public static final String SNOWFLAKE_SPLIT_OBJECT_KEY = "s3ObjectKey";
3948
/** Configuration key for specifying the authentication method */
4049
public static final String AUTHENTICATOR = "authenticator";
4150

@@ -57,7 +66,7 @@ public final class SnowflakeConstants
5766
public static final String PEM_PRIVATE_KEY = "pem_private_key";
5867
public static final String PEM_PRIVATE_KEY_PASSPHRASE = "pem_private_key_passphrase";
5968
public static final String PRIVATE_KEY = "privateKey";
60-
69+
6170
/**
6271
* Password Authentication Constants
6372
* These constants are used for traditional username/password authentication with Snowflake.

athena-snowflake/src/main/java/com/amazonaws/athena/connectors/snowflake/SnowflakeEnvironmentProperties.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@ public class SnowflakeEnvironmentProperties extends JdbcEnvironmentProperties
4343
private static final String DB_PROPERTY_KEY = "db";
4444
private static final String SCHEMA_PROPERTY_KEY = "schema";
4545
private static final String SNOWFLAKE_ESCAPE_CHARACTER = "\"";
46+
public static final String ENABLE_S3_EXPORT = "SNOWFLAKE_ENABLE_S3_EXPORT";
47+
48+
private final boolean enableS3Export;
49+
50+
public SnowflakeEnvironmentProperties(Map<String, String> properties)
51+
{
52+
this.enableS3Export = Boolean.parseBoolean(properties.getOrDefault(ENABLE_S3_EXPORT, "false"));
53+
}
4654

4755
@Override
4856
public Map<String, String> connectionPropertiesToEnvironment(Map<String, String> connectionProperties)
@@ -134,4 +142,9 @@ public static Map<String, String> getSnowFlakeParameter(Map<String, String> base
134142

135143
return parameters;
136144
}
145+
146+
public boolean isS3ExportEnabled()
147+
{
148+
return enableS3Export;
149+
}
137150
}

0 commit comments

Comments
 (0)