Skip to content

Commit 73da94a

Browse files
committed
SSEC integration changes
1 parent a21927a commit 73da94a

File tree

6 files changed

+105
-7
lines changed

6 files changed

+105
-7
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1920,7 +1920,8 @@ private FSDataInputStream executeOpen(
19201920
.withCallbacks(createInputStreamCallbacks(auditSpan))
19211921
.withContext(readContext.build())
19221922
.withObjectAttributes(createObjectAttributes(path, fileStatus))
1923-
.withStreamStatistics(inputStreamStats);
1923+
.withStreamStatistics(inputStreamStats)
1924+
.withEncryptionSecrets(getEncryptionSecrets());
19241925
return new FSDataInputStream(getStore().readObject(parameters));
19251926
}
19261927

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,13 @@
2121

2222
import java.io.EOFException;
2323
import java.io.IOException;
24+
import java.util.Optional;
2425

26+
import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
27+
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecretOperations;
2528
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
2629
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
30+
import software.amazon.s3.analyticsaccelerator.request.EncryptionSecrets;
2731
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
2832
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
2933
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
@@ -205,6 +209,12 @@ private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters pa
205209
.etag(parameters.getObjectAttributes().getETag()).build());
206210
}
207211

212+
if(parameters.getEncryptionSecrets().getEncryptionMethod() == S3AEncryptionMethods.SSE_C) {
213+
EncryptionSecretOperations.getSSECustomerKey(parameters.getEncryptionSecrets())
214+
.ifPresent(base64customerKey -> openStreamInformationBuilder.encryptionSecrets(
215+
EncryptionSecrets.builder().sseCustomerKey(Optional.of(base64customerKey)).build()));
216+
}
217+
208218
return openStreamInformationBuilder.build();
209219
}
210220

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hadoop.fs.LocalDirAllocator;
2424
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
2525
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
26+
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
2627
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
2728

2829
import static java.util.Objects.requireNonNull;
@@ -69,6 +70,29 @@ public final class ObjectReadParameters {
6970
*/
7071
private LocalDirAllocator directoryAllocator;
7172

73+
/**
74+
* Encryption secrets for this stream
75+
*/
76+
private EncryptionSecrets encryptionSecrets;
77+
78+
/**
79+
* Getter.
80+
* @return Encryption secrets.
81+
*/
82+
public EncryptionSecrets getEncryptionSecrets() {
83+
return encryptionSecrets;
84+
}
85+
86+
/**
87+
* Set encryption secrets.
88+
* @param value new value
89+
* @return the builder
90+
*/
91+
public ObjectReadParameters withEncryptionSecrets(final EncryptionSecrets value) {
92+
encryptionSecrets = value;
93+
return this;
94+
}
95+
7296
/**
7397
* @return Read operation context.
7498
*/
@@ -185,6 +209,7 @@ public ObjectReadParameters validate() {
185209
requireNonNull(directoryAllocator, "directoryAllocator");
186210
requireNonNull(objectAttributes, "objectAttributes");
187211
requireNonNull(streamStatistics, "streamStatistics");
212+
requireNonNull(encryptionSecrets, "encryptionSecrets");
188213
return this;
189214
}
190215
}

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
import java.io.File;
2323
import java.io.IOException;
2424
import java.io.InputStream;
25+
import java.lang.reflect.Field;
2526

27+
import org.apache.hadoop.fs.contract.s3a.S3AContract;
28+
import org.apache.hadoop.fs.s3a.impl.streams.AnalyticsStream;
2629
import org.junit.Before;
2730
import org.junit.Test;
2831
import org.assertj.core.api.Assertions;
@@ -36,14 +39,21 @@
3639
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
3740
import org.apache.hadoop.fs.statistics.IOStatistics;
3841

42+
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
3943
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
4044
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
45+
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
4146

4247
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
4348
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
4449
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
50+
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
51+
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
4552
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
53+
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
54+
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
4655
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
56+
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
4757
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
4858
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
4959
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
@@ -64,6 +74,7 @@
6474
public class ITestS3AAnalyticsAcceleratorStreamReading extends AbstractS3ATestBase {
6575

6676
private static final String PHYSICAL_IO_PREFIX = "physicalio";
77+
private static final String SSEC_KEY = "4niV/jPK5VFRHY+KNb6wtqYd4xXyMgdJ9XQJpcQUVbs=";
6778

6879
private Path externalTestFile;
6980

@@ -194,4 +205,61 @@ public void testInvalidConfigurationThrows() throws Exception {
194205
() -> S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
195206
}
196207

208+
/**
209+
* This test verifies that the OpenStreamInfo object contains correct encryption
210+
* settings when reading an SSEC encrypted file with analytics stream.
211+
*
212+
* @throws Exception
213+
*/
214+
215+
@Test
216+
public void testAnalyticsStreamOpenStreamInfoWhenSSECEncryptionEnabled() throws Exception {
217+
Configuration confSSEC = this.createConfiguration();
218+
S3ATestUtils.disableFilesystemCaching(confSSEC);
219+
removeBaseAndBucketOverrides(getTestBucketName(confSSEC), confSSEC, S3_ENCRYPTION_ALGORITHM, S3_ENCRYPTION_KEY);
220+
221+
confSSEC.set(S3_ENCRYPTION_ALGORITHM, S3AEncryptionMethods.SSE_C.getMethod());
222+
confSSEC.set(S3_ENCRYPTION_KEY, SSEC_KEY);
223+
224+
S3AContract contractSSEC = (S3AContract) createContract(confSSEC);
225+
contractSSEC.init();
226+
contractSSEC.setConf(confSSEC);
227+
S3AFileSystem fileSystemSSEC = (S3AFileSystem) contractSSEC.getTestFileSystem();
228+
229+
int len = TEST_FILE_LEN;
230+
describe("Create an encrypted file and verify OpenStreamInfo encryption settings");
231+
Path src = methodPath();
232+
byte[] data = dataset(len, 'a', 'z');
233+
writeDataset(fileSystemSSEC, src, data, len, 1024 * 1024, true);
234+
// Read with encryption settings
235+
OpenStreamInformation originalInfo;
236+
try (FSDataInputStream in = fileSystemSSEC.open(src)) {
237+
AnalyticsStream s3AInputStream = (AnalyticsStream) in.getWrappedStream();
238+
Field inputStreamField = s3AInputStream.getClass().getDeclaredField("inputStream");
239+
inputStreamField.setAccessible(true);
240+
S3SeekableInputStream seekableInputStream = (S3SeekableInputStream) inputStreamField.get(s3AInputStream);
241+
242+
Field logicalIOField = seekableInputStream.getClass().getDeclaredField("logicalIO");
243+
logicalIOField.setAccessible(true);
244+
Object logicalIO = logicalIOField.get(seekableInputStream);
245+
246+
Field physicalIOField = logicalIO.getClass().getDeclaredField("physicalIO");
247+
physicalIOField.setAccessible(true);
248+
Object physicalIO = physicalIOField.get(logicalIO);
249+
250+
Field openStreamInfoField = physicalIO.getClass().getDeclaredField("openStreamInformation");
251+
openStreamInfoField.setAccessible(true);
252+
originalInfo = (OpenStreamInformation) openStreamInfoField.get(physicalIO);
253+
254+
// Verify the OpenStreamInfo object exists
255+
assertNotNull(originalInfo.getEncryptionSecrets().getSsecCustomerKey().get(), "OpenStreamInfo should not be null");
256+
assertEquals(SSEC_KEY, originalInfo.getEncryptionSecrets().getSsecCustomerKey().get());
257+
}
258+
259+
// Clean up
260+
fileSystemSSEC.delete(src, false);
261+
}
262+
263+
264+
197265
}

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
4444
import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeSkipRootTests;
4545
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
46-
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
4746
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
4847

4948
/**
@@ -96,8 +95,6 @@ protected Configuration createConfiguration() {
9695
@Override
9796
public void setup() throws Exception {
9897
super.setup();
99-
skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
100-
"Analytics Accelerator currently does not support SSE-C");
10198
assumeEnabled();
10299
// although not a root dir test, this confuses paths enough it shouldn't be run in
103100
// parallel with other jobs

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
3232
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
3333
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
34-
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
3534
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
3635

3736
/**
@@ -55,8 +54,6 @@ public class ITestS3AHugeFilesSSECDiskBlocks
5554
public void setup() throws Exception {
5655
try {
5756
super.setup();
58-
skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
59-
"Analytics Accelerator currently does not support SSE-C");
6057
} catch (AccessDeniedException | AWSUnsupportedFeatureException e) {
6158
skip("Bucket does not allow " + S3AEncryptionMethods.SSE_C + " encryption method");
6259
}

0 commit comments

Comments
 (0)