Skip to content

Commit a262aed

Browse files
committed
changes as per review comments
1 parent 2313d91 commit a262aed

File tree

7 files changed

+171
-91
lines changed

7 files changed

+171
-91
lines changed

hadoop-tools/hadoop-aws/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@
511511
<dependency>
512512
<groupId>software.amazon.encryption.s3</groupId>
513513
<artifactId>amazon-s3-encryption-client-java</artifactId>
514-
<scope>compile</scope>
514+
<scope>provided</scope>
515515
</dependency>
516516
<dependency>
517517
<groupId>software.amazon.eventstream</groupId>

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -96,37 +96,6 @@ public S3Client createS3Client(
9696
.build();
9797
}
9898

99-
@Override
100-
public S3Client createS3EncryptionClient(final S3AsyncClient s3AsyncClient,
101-
final S3Client s3Client, final CSEMaterials cseMaterials) {
102-
103-
S3EncryptionClient.Builder s3EncryptionClientBuilder =
104-
S3EncryptionClient.builder().wrappedAsyncClient(s3AsyncClient).wrappedClient(s3Client)
105-
.enableLegacyUnauthenticatedModes(true);
106-
107-
if (cseMaterials.getCseKeyType().equals(CSEMaterials.CSEKeyType.KMS)) {
108-
s3EncryptionClientBuilder.kmsKeyId(cseMaterials.getKmsKeyId());
109-
}
110-
111-
return s3EncryptionClientBuilder.build();
112-
}
113-
114-
115-
@Override
116-
public S3AsyncClient createS3AsyncEncryptionClient(final S3AsyncClient s3AsyncClient,
117-
final CSEMaterials cseMaterials) {
118-
119-
S3AsyncEncryptionClient.Builder s3EncryptionAsyncClientBuilder =
120-
S3AsyncEncryptionClient.builder().wrappedClient(s3AsyncClient)
121-
.enableLegacyUnauthenticatedModes(true);
122-
123-
if (cseMaterials.getCseKeyType().equals(CSEMaterials.CSEKeyType.KMS)) {
124-
s3EncryptionAsyncClientBuilder.kmsKeyId(cseMaterials.getKmsKeyId());
125-
}
126-
127-
return s3EncryptionAsyncClientBuilder.build();
128-
}
129-
13099
@Override
131100
public S3AsyncClient createS3AsyncClient(
132101
final URI uri,
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a;
20+
21+
import java.io.IOException;
22+
import java.net.URI;
23+
24+
import software.amazon.awssdk.services.s3.S3AsyncClient;
25+
import software.amazon.awssdk.services.s3.S3Client;
26+
import software.amazon.encryption.s3.S3AsyncEncryptionClient;
27+
import software.amazon.encryption.s3.S3EncryptionClient;
28+
29+
import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
30+
31+
import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
32+
33+
public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
34+
35+
private static final String ENCRYPTION_CLIENT_CLASSNAME =
36+
"software.amazon.encryption.s3.S3EncryptionClient";
37+
38+
/**
39+
* Encryption client availability.
40+
*/
41+
private static final boolean ENCRYPTION_CLIENT_FOUND = checkForEncryptionClient();
42+
43+
/**
44+
* S3Client to be wrapped by encryption client.
45+
*/
46+
private S3Client s3Client;
47+
48+
/**
49+
* S3AsyncClient to be wrapped by encryption client.
50+
*/
51+
private S3AsyncClient s3AsyncClient;
52+
53+
private static boolean checkForEncryptionClient() {
54+
try {
55+
ClassLoader cl = EncryptionS3ClientFactory.class.getClassLoader();
56+
cl.loadClass(ENCRYPTION_CLIENT_CLASSNAME);
57+
LOG.debug("encryption client class {} found", ENCRYPTION_CLIENT_CLASSNAME);
58+
return true;
59+
} catch (Exception e) {
60+
LOG.debug("encryption client class {} not found", ENCRYPTION_CLIENT_CLASSNAME, e);
61+
return false;
62+
}
63+
}
64+
65+
/**
66+
* Is the Encryption client available?
67+
* @return true if it was found in the classloader
68+
*/
69+
private static synchronized boolean isEncryptionClientAvailable() {
70+
return ENCRYPTION_CLIENT_FOUND;
71+
}
72+
73+
74+
@Override
75+
public S3Client createS3Client(URI uri, S3ClientCreationParameters parameters) throws IOException {
76+
77+
if (!isEncryptionClientAvailable()) {
78+
throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null, "No encryption client available");
79+
}
80+
81+
s3Client = super.createS3Client(uri, parameters);
82+
s3AsyncClient = super.createS3AsyncClient(uri, parameters);
83+
84+
return createS3EncryptionClient(parameters.getClientSideEncryptionMaterials());
85+
}
86+
87+
@Override
88+
public S3AsyncClient createS3AsyncClient(URI uri, S3ClientCreationParameters parameters) throws IOException {
89+
90+
if (!isEncryptionClientAvailable()) {
91+
throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null, "No encryption client available");
92+
}
93+
94+
return createS3AsyncEncryptionClient(parameters.getClientSideEncryptionMaterials());
95+
}
96+
97+
private S3Client createS3EncryptionClient(final CSEMaterials cseMaterials) {
98+
S3EncryptionClient.Builder s3EncryptionClientBuilder =
99+
S3EncryptionClient.builder().wrappedAsyncClient(s3AsyncClient).wrappedClient(s3Client)
100+
.enableLegacyUnauthenticatedModes(true);
101+
102+
if (cseMaterials.getCseKeyType().equals(CSEMaterials.CSEKeyType.KMS)) {
103+
s3EncryptionClientBuilder.kmsKeyId(cseMaterials.getKmsKeyId());
104+
}
105+
106+
return s3EncryptionClientBuilder.build();
107+
}
108+
109+
110+
private S3AsyncClient createS3AsyncEncryptionClient(final CSEMaterials cseMaterials) {
111+
112+
S3AsyncEncryptionClient.Builder s3EncryptionAsyncClientBuilder =
113+
S3AsyncEncryptionClient.builder().wrappedClient(s3AsyncClient)
114+
.enableLegacyUnauthenticatedModes(true);
115+
116+
if (cseMaterials.getCseKeyType().equals(CSEMaterials.CSEKeyType.KMS)) {
117+
s3EncryptionAsyncClientBuilder.kmsKeyId(cseMaterials.getKmsKeyId());
118+
}
119+
120+
return s3EncryptionAsyncClientBuilder.build();
121+
}
122+
123+
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@
9797
import software.amazon.awssdk.transfer.s3.model.CopyRequest;
9898
import software.amazon.awssdk.transfer.s3.model.FileUpload;
9999
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
100-
import software.amazon.encryption.s3.materials.KmsKeyring;
101100

102101
import org.apache.commons.lang3.StringUtils;
103102
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
@@ -982,6 +981,22 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
982981

983982
Region region = getS3Region(configuredRegion);
984983

984+
985+
S3ClientFactory clientFactory;
986+
CSEMaterials cseMaterials = null;
987+
988+
if (isCSEEnabled) {
989+
String kmsKeyId = getS3EncryptionKey(bucket, conf, true);
990+
991+
cseMaterials = new CSEMaterials()
992+
.withCSEKeyType(CSEMaterials.CSEKeyType.KMS)
993+
.withKmsKeyId(kmsKeyId);
994+
995+
clientFactory = ReflectionUtils.newInstance(EncryptionS3ClientFactory.class, conf);
996+
} else {
997+
clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
998+
}
999+
9851000
S3ClientFactory.S3ClientCreationParameters parameters =
9861001
new S3ClientFactory.S3ClientCreationParameters()
9871002
.withCredentialSet(credentials)
@@ -997,24 +1012,12 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
9971012
.withMultipartThreshold(multiPartThreshold)
9981013
.withTransferManagerExecutor(unboundedThreadPool)
9991014
.withRegion(region)
1000-
.withClientSideEncryptionEnabled(isCSEEnabled);
1015+
.withClientSideEncryptionEnabled(isCSEEnabled)
1016+
.withClientSideEncryptionMaterials(cseMaterials);
1017+
10011018

1002-
S3ClientFactory clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
10031019
s3Client = clientFactory.createS3Client(getUri(), parameters);
10041020
createS3AsyncClient(clientFactory, parameters);
1005-
1006-
if (isCSEEnabled) {
1007-
String kmsKeyId = getS3EncryptionKey(bucket, conf, true);
1008-
1009-
CSEMaterials cseMaterials = new CSEMaterials()
1010-
.withCSEKeyType(CSEMaterials.CSEKeyType.KMS)
1011-
.withKmsKeyId(kmsKeyId);
1012-
1013-
1014-
s3Client = clientFactory.createS3EncryptionClient(s3AsyncClient, s3Client, cseMaterials);
1015-
s3AsyncClient = clientFactory.createS3AsyncEncryptionClient(s3AsyncClient, cseMaterials);
1016-
}
1017-
10181021
transferManager = clientFactory.createS3TransferManager(getS3AsyncClient());
10191022
}
10201023

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -84,30 +84,6 @@ S3AsyncClient createS3AsyncClient(URI uri,
8484
S3ClientCreationParameters parameters) throws IOException;
8585

8686

87-
/**
88-
* Creates a new {@link software.amazon.encryption.s3.S3EncryptionClient}.
89-
* Used when client side encryption is enabled.
90-
*
91-
* @param s3AsyncClient The asynchronous S3 client, will be used for cryptographic operations.
92-
* @param s3Client The synchronous S3 client, will be used for non cryptographic operations.
93-
* @param cseMaterials cse key and type to use
94-
* @return S3EncryptionClient
95-
*/
96-
S3Client createS3EncryptionClient(S3AsyncClient s3AsyncClient, S3Client s3Client,
97-
CSEMaterials cseMaterials);
98-
99-
100-
/**
101-
* Creates a new {@link software.amazon.encryption.s3.S3AsyncEncryptionClient}.
102-
* Used when client side encryption is enabled.
103-
*
104-
* @param s3AsyncClient The asynchronous S3 client, will be used for cryptographic operations.
105-
* @param cseMaterials cse key and type to use
106-
* @return S3AsyncEncryptionClient
107-
*/
108-
S3AsyncClient createS3AsyncEncryptionClient(S3AsyncClient s3AsyncClient, CSEMaterials cseMaterials);
109-
110-
11187
/**
11288
* Creates a new {@link S3TransferManager}.
11389
*
@@ -202,6 +178,10 @@ final class S3ClientCreationParameters {
202178
*/
203179
private Boolean isCSEEnabled;
204180

181+
/**
182+
* Client side encryption materials.
183+
*/
184+
private CSEMaterials cseMaterials;
205185

206186
/**
207187
* List of execution interceptors to include in the chain
@@ -473,5 +453,24 @@ public S3ClientCreationParameters withClientSideEncryptionEnabled(final boolean
473453
public boolean isClientSideEncryptionEnabled() {
474454
return this.isCSEEnabled;
475455
}
456+
457+
/**
458+
* Set the client side encryption materials.
459+
*
460+
* @param value new value
461+
* @return the builder
462+
*/
463+
public S3ClientCreationParameters withClientSideEncryptionMaterials(final CSEMaterials value) {
464+
this.cseMaterials = value;
465+
return this;
466+
}
467+
468+
/**
469+
* Get the client side encryption materials.
470+
* @return client side encryption materials
471+
*/
472+
public CSEMaterials getClientSideEncryptionMaterials() {
473+
return this.cseMaterials;
474+
}
476475
}
477476
}

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ public void shouldBeAbleToSwitchOnS3PathStyleAccessViaConfigProperty()
365365
throws Exception {
366366

367367
conf = new Configuration();
368-
unsetClientSideConfiguration(conf);
368+
unsetEncryption(conf);
369369
conf.set(Constants.PATH_STYLE_ACCESS, Boolean.toString(true));
370370
assertTrue(conf.getBoolean(Constants.PATH_STYLE_ACCESS, false));
371371

@@ -404,7 +404,7 @@ public void shouldBeAbleToSwitchOnS3PathStyleAccessViaConfigProperty()
404404
@Test
405405
public void testDefaultUserAgent() throws Exception {
406406
conf = new Configuration();
407-
unsetClientSideConfiguration(conf);
407+
unsetEncryption(conf);
408408
fs = S3ATestUtils.createTestFileSystem(conf);
409409
assertNotNull(fs);
410410
S3Client s3 = getS3Client("User Agent");
@@ -418,7 +418,7 @@ public void testDefaultUserAgent() throws Exception {
418418
@Test
419419
public void testCustomUserAgent() throws Exception {
420420
conf = new Configuration();
421-
unsetClientSideConfiguration(conf);
421+
unsetEncryption(conf);
422422
conf.set(Constants.USER_AGENT_PREFIX, "MyApp");
423423
fs = S3ATestUtils.createTestFileSystem(conf);
424424
assertNotNull(fs);
@@ -434,7 +434,7 @@ public void testCustomUserAgent() throws Exception {
434434
public void testRequestTimeout() throws Exception {
435435
conf = new Configuration();
436436
conf.set(REQUEST_TIMEOUT, "120");
437-
unsetClientSideConfiguration(conf);
437+
unsetEncryption(conf);
438438
fs = S3ATestUtils.createTestFileSystem(conf);
439439
S3Client s3 = getS3Client("Request timeout (ms)");
440440
SdkClientConfiguration clientConfiguration = getField(s3, SdkClientConfiguration.class,
@@ -579,7 +579,7 @@ public void testS3SpecificSignerOverride() throws Exception {
579579
.describedAs("Custom STS signer not called").isTrue();
580580
}
581581

582-
private void unsetClientSideConfiguration(Configuration conf) {
582+
private void unsetEncryption(Configuration conf) {
583583
removeBaseAndBucketOverrides(conf, S3_ENCRYPTION_ALGORITHM);
584584
conf.set(Constants.S3_ENCRYPTION_ALGORITHM,
585585
S3AEncryptionMethods.NONE.getMethod());

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -63,20 +63,6 @@ public S3AsyncClient createS3AsyncClient(URI uri, final S3ClientCreationParamete
6363
return s3;
6464
}
6565

66-
@Override
67-
public S3Client createS3EncryptionClient(final S3AsyncClient s3AsyncClient,
68-
final S3Client s3Client, final CSEMaterials cseMaterials) {
69-
S3Client s3 = mock(S3Client.class);
70-
return s3;
71-
}
72-
73-
@Override
74-
public S3AsyncClient createS3AsyncEncryptionClient(final S3AsyncClient s3AsyncClient,
75-
final CSEMaterials cseMaterials) {
76-
S3AsyncClient s3 = mock(S3AsyncClient.class);
77-
return s3;
78-
}
79-
8066

8167
@Override
8268
public S3TransferManager createS3TransferManager(S3AsyncClient s3AsyncClient) {

0 commit comments

Comments
 (0)