Skip to content

Commit 97ab1ad

Browse files
committed
snowflake connector performance improvement s3 export
1 parent f946a9b commit 97ab1ad

19 files changed

+1666
-1124
lines changed

athena-snowflake/athena-snowflake-connection.yaml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ Resources:
8787
Principal:
8888
Service:
8989
- lambda.amazonaws.com
90+
AWS: '*'
9091
Action:
9192
- "sts:AssumeRole"
9293

@@ -126,14 +127,15 @@ Resources:
126127
Resource: '*'
127128
- Action:
128129
- s3:GetObject
130+
- s3:GetObjectVersion
129131
- s3:ListBucket
130132
- s3:GetBucketLocation
131-
- s3:GetObjectVersion
132133
- s3:PutObject
133134
- s3:PutObjectAcl
134135
- s3:GetLifecycleConfiguration
135136
- s3:PutLifecycleConfiguration
136137
- s3:DeleteObject
138+
- s3:DeleteObjectVersion
137139
Effect: Allow
138140
Resource:
139141
- Fn::Sub:
@@ -150,6 +152,10 @@ Resources:
150152
Resource:
151153
- !Sub 'arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:connection/${GlueConnection}'
152154
- !Sub 'arn:${AWS::Partition}:glue:${AWS::Region}:${AWS::AccountId}:catalog'
155+
- Action:
156+
- lambda:GetFunction
157+
Effect: Allow
158+
Resource: !Sub 'arn:${AWS::Partition}:lambda:${AWS::Region}:${AWS::AccountId}:function:${LambdaFunctionName}'
153159

154160
FunctionKmsPolicy:
155161
Condition: CreateKmsPolicy

athena-snowflake/athena-snowflake.yaml

Lines changed: 80 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,17 @@ Parameters:
5454
Description: "(Optional) An IAM policy ARN to use as the PermissionsBoundary for the created Lambda function's execution role"
5555
Default: ''
5656
Type: String
57+
LambdaRoleArn:
58+
Description: "(Optional) A custom role to be used by the Connector lambda"
59+
Type: String
60+
Default: ""
5761
Conditions:
5862
HasPermissionsBoundary: !Not [ !Equals [ !Ref PermissionsBoundaryARN, "" ] ]
5963
HasSecurityGroups: !Not [ !Equals [ !Join ["", !Ref SecurityGroupIds], "" ] ]
6064
HasSubnets: !Not [ !Equals [ !Join ["", !Ref SubnetIds], "" ] ]
6165
IsRegionBAH: !Equals [!Ref "AWS::Region", "me-south-1"]
6266
IsRegionHKG: !Equals [!Ref "AWS::Region", "ap-east-1"]
67+
NotHasLambdaRole: !Equals [ !Ref LambdaRoleArn, "" ]
6368
Resources:
6469
JdbcConnectorConfig:
6570
Type: 'AWS::Serverless::Function'
@@ -76,43 +81,84 @@ Resources:
7681
- '${Account}.dkr.ecr.${AWS::Region}.amazonaws.com/athena-federation-repository-snowflake:2022.47.1'
7782
- Account: !If [IsRegionBAH, 084828588479, !If [IsRegionHKG, 183295418215, 292517598671]]
7883
ImageConfig:
79-
Command: [ "com.amazonaws.athena.connectors.snowflake.SnowflakeMuxCompositeHandler" ]
84+
Command: [ "com.amazonaws.athena.connectors.snowflake.SnowflakeCompositeHandler" ]
8085
Description: "Enables Amazon Athena to communicate with Snowflake using JDBC"
8186
Timeout: !Ref LambdaTimeout
8287
MemorySize: !Ref LambdaMemory
88+
Role: !If [ NotHasLambdaRole, !GetAtt FunctionRole.Arn, !Ref LambdaRoleArn ]
89+
VpcConfig:
90+
SecurityGroupIds: !If [ HasSecurityGroups, !Ref SecurityGroupIds, !Ref "AWS::NoValue" ]
91+
SubnetIds: !If [ HasSubnets, !Ref SubnetIds, !Ref "AWS::NoValue" ]
92+
93+
FunctionRole:
94+
Condition: NotHasLambdaRole
95+
Type: AWS::IAM::Role
96+
Properties:
8397
PermissionsBoundary: !If [ HasPermissionsBoundary, !Ref PermissionsBoundaryARN, !Ref "AWS::NoValue" ]
84-
Policies:
85-
- Statement:
86-
- Action:
87-
- secretsmanager:GetSecretValue
88-
Effect: Allow
89-
Resource: !Sub 'arn:${AWS::Partition}:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:${SecretNamePrefix}*'
90-
Version: '2012-10-17'
91-
- Statement:
92-
- Action:
93-
- logs:CreateLogGroup
94-
Effect: Allow
95-
Resource: !Sub 'arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:*'
96-
Version: '2012-10-17'
97-
- Statement:
98-
- Action:
99-
- logs:CreateLogStream
100-
- logs:PutLogEvents
101-
Effect: Allow
102-
Resource: !Sub 'arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/${LambdaFunctionName}:*'
103-
Version: '2012-10-17'
104-
- Statement:
105-
- Action:
106-
- athena:GetQueryExecution
107-
Effect: Allow
108-
Resource: '*'
109-
Version: '2012-10-17'
98+
ManagedPolicyArns:
99+
- !Sub "arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
100+
AssumeRolePolicyDocument:
101+
Version: 2012-10-17
102+
Statement:
103+
- Effect: Allow
104+
Principal:
105+
Service:
106+
- lambda.amazonaws.com
107+
AWS: '*'
108+
Action:
109+
- "sts:AssumeRole"
110+
111+
FunctionExecutionPolicy:
112+
Condition: NotHasLambdaRole
113+
Type: "AWS::IAM::Policy"
114+
Properties:
115+
PolicyName: FunctionExecutionPolicy
116+
PolicyDocument:
117+
Version: 2012-10-17
118+
Statement:
119+
- Action:
120+
- secretsmanager:GetSecretValue
121+
Effect: Allow
122+
Resource: !Sub 'arn:${AWS::Partition}:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:${SecretNamePrefix}*'
123+
- Action:
124+
- logs:CreateLogGroup
125+
Effect: Allow
126+
Resource: !Sub 'arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:*'
127+
- Action:
128+
- logs:CreateLogStream
129+
- logs:PutLogEvents
130+
Effect: Allow
131+
Resource: !Sub 'arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/${LambdaFunctionName}:*'
132+
- Action:
133+
- lambda:GetFunction
134+
Effect: Allow
135+
Resource: !Sub 'arn:${AWS::Partition}:lambda:${AWS::Region}:${AWS::AccountId}:function:${LambdaFunctionName}'
136+
- Action:
137+
- athena:GetQueryExecution
138+
Effect: Allow
139+
Resource: '*'
110140
#S3CrudPolicy allows our connector to spill large responses to S3. You can optionally replace this pre-made policy
111141
#with one that is more restrictive and can only 'put' but not read,delete, or overwrite files.
112-
- S3CrudPolicy:
113-
BucketName: !Ref SpillBucket
114-
#VPCAccessPolicy allows our connector to run in a VPC so that it can access your data source.
115-
- VPCAccessPolicy: {}
116-
VpcConfig:
117-
SecurityGroupIds: !If [ HasSecurityGroups, !Ref SecurityGroupIds, !Ref "AWS::NoValue" ]
118-
SubnetIds: !If [ HasSubnets, !Ref SubnetIds, !Ref "AWS::NoValue" ]
142+
- Action:
143+
- s3:GetObject
144+
- s3:GetObjectVersion
145+
- s3:ListBucket
146+
- s3:GetBucketLocation
147+
- s3:PutObject
148+
- s3:PutObjectAcl
149+
- s3:GetLifecycleConfiguration
150+
- s3:PutLifecycleConfiguration
151+
- s3:DeleteObject
152+
- s3:DeleteObjectVersion
153+
Effect: Allow
154+
Resource:
155+
- Fn::Sub:
156+
- arn:${AWS::Partition}:s3:::${bucketName}
157+
- bucketName:
158+
Ref: SpillBucket
159+
- Fn::Sub:
160+
- arn:${AWS::Partition}:s3:::${bucketName}/*
161+
- bucketName:
162+
Ref: SpillBucket
163+
Roles:
164+
- !Ref FunctionRole

athena-snowflake/pom.xml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,31 @@
2020
<artifactId>athena-jdbc</artifactId>
2121
<version>2022.47.1</version>
2222
</dependency>
23+
<dependency>
24+
<groupId>org.apache.arrow</groupId>
25+
<artifactId>arrow-dataset</artifactId>
26+
<version>${apache.arrow.version}</version>
27+
</dependency>
28+
<dependency>
29+
<groupId>org.apache.arrow</groupId>
30+
<artifactId>arrow-c-data</artifactId>
31+
<version>${apache.arrow.version}</version>
32+
</dependency>
33+
<dependency>
34+
<groupId>org.apache.arrow</groupId>
35+
<artifactId>arrow-memory-core</artifactId>
36+
<version>${apache.arrow.version}</version>
37+
</dependency>
38+
<dependency>
39+
<groupId>net.java.dev.jna</groupId>
40+
<artifactId>jna-platform</artifactId>
41+
<version>5.16.0</version>
42+
</dependency>
43+
<dependency>
44+
<groupId>net.java.dev.jna</groupId>
45+
<artifactId>jna</artifactId>
46+
<version>5.16.0</version>
47+
</dependency>
2348
<dependency>
2449
<groupId>com.amazonaws</groupId>
2550
<artifactId>athena-jdbc</artifactId>

athena-snowflake/src/main/java/com/amazonaws/athena/connectors/snowflake/SnowflakeCompositeHandler.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,26 @@
2424

2525
import com.amazonaws.athena.connector.lambda.handlers.CompositeHandler;
2626

27+
import java.io.IOException;
28+
import java.security.KeyStoreException;
29+
import java.security.NoSuchAlgorithmException;
30+
import java.security.cert.CertificateEncodingException;
31+
32+
import static com.amazonaws.athena.connectors.snowflake.SnowflakeUtils.installCaCertificate;
33+
import static com.amazonaws.athena.connectors.snowflake.SnowflakeUtils.setupNativeEnvironmentVariables;
34+
2735
/**
2836
* Boilerplate composite handler that allows us to use a single Lambda function for both
2937
* Metadata and Data. In this case we just compose {@link SnowflakeMetadataHandler} and {@link SnowflakeRecordHandler}.
3038
*
31-
* Recommend using {@link SnowflakeMuxCompositeHandler} instead.
3239
*/
3340
public class SnowflakeCompositeHandler
3441
extends CompositeHandler
3542
{
36-
public SnowflakeCompositeHandler()
43+
public SnowflakeCompositeHandler() throws CertificateEncodingException, IOException, NoSuchAlgorithmException, KeyStoreException
3744
{
3845
super(new SnowflakeMetadataHandler(new SnowflakeEnvironmentProperties().createEnvironment()), new SnowflakeRecordHandler(new SnowflakeEnvironmentProperties().createEnvironment()));
46+
installCaCertificate();
47+
setupNativeEnvironmentVariables();
3948
}
4049
}

athena-snowflake/src/main/java/com/amazonaws/athena/connectors/snowflake/SnowflakeConstants.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,20 @@ public final class SnowflakeConstants
2525
public static final String SNOWFLAKE_NAME = "snowflake";
2626
public static final String SNOWFLAKE_DRIVER_CLASS = "com.snowflake.client.jdbc.SnowflakeDriver";
2727
public static final int SNOWFLAKE_DEFAULT_PORT = 1025;
28-
/**
29-
* This constant limits the number of partitions. The default set to 50. A large number may cause a timeout issue.
30-
* We arrived at this number after performance testing with datasets of different size
31-
*/
32-
public static final int MAX_PARTITION_COUNT = 50;
3328
/**
3429
* This constant limits the number of records to be returned in a single split.
3530
*/
36-
public static final int SINGLE_SPLIT_LIMIT_COUNT = 10000;
3731
public static final String SNOWFLAKE_QUOTE_CHARACTER = "\"";
32+
/**
33+
* A ssl file location constant to store the SSL certificate
34+
* The file location is fixed at /tmp directory
35+
* to retrieve ssl certificate location
36+
*/
37+
public static final String SSL_CERT_FILE_LOCATION = "SSL_CERT_FILE";
38+
public static final String SSL_CERT_FILE_LOCATION_VALUE = "/tmp/cacert.pem";
39+
public static final String SNOWFLAKE_SPLIT_QUERY_ID = "query_id";
40+
public static final String SNOWFLAKE_SPLIT_EXPORT_BUCKET = "exportBucket";
41+
public static final String SNOWFLAKE_SPLIT_OBJECT_KEY = "s3ObjectKey";
3842

3943
private SnowflakeConstants() {}
4044
}

0 commit comments

Comments
 (0)