Skip to content

Commit db01524

Browse files
committed
Address review feedback v2
1 parent 28714ac commit db01524

32 files changed

+126
-97
lines changed

hadoop-tools/hadoop-aws/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@
528528
<dependency>
529529
<groupId>software.amazon.s3.analyticsaccelerator</groupId>
530530
<artifactId>analyticsaccelerator-s3</artifactId>
531-
<version>0.0.1</version>
531+
<version>0.0.2</version>
532532
<scope>compile</scope>
533533
</dependency>
534534
<dependency>

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
326326

327327
/**
328328
* CRT-Based S3Client created of analytics accelerator library is enabled
329-
* and managed by the ClientManager. Analytics accelerator library can be
329+
* and managed by the S3AStoreImpl. Analytics accelerator library can be
330330
* enabled with {@link Constants#ANALYTICS_ACCELERATOR_ENABLED_KEY}
331331
*/
332332
private S3AsyncClient s3AsyncClient;
@@ -545,7 +545,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
545545
private boolean s3AccessGrantsEnabled;
546546

547547
/**
548-
* Factory to create S3SeekableInputStream if {@link this#analyticsAcceleratorEnabled} is true
548+
* Factory to create S3SeekableInputStream if {@link this#analyticsAcceleratorEnabled} is true.
549549
*/
550550
private S3SeekableInputStreamFactory s3SeekableInputStreamFactory;
551551

@@ -705,8 +705,10 @@ public void initialize(URI name, Configuration originalConf)
705705
this.prefetchBlockCount =
706706
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);
707707

708-
this.analyticsAcceleratorEnabled = conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
709-
this.analyticsAcceleratorCRTEnabled = conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);
708+
this.analyticsAcceleratorEnabled =
709+
conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
710+
this.analyticsAcceleratorCRTEnabled =
711+
conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);
710712

711713
this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
712714
DEFAULT_MULTIPART_UPLOAD_ENABLED);
@@ -847,10 +849,10 @@ public void initialize(URI name, Configuration originalConf)
847849
if (this.analyticsAcceleratorEnabled) {
848850
LOG.info("Using S3SeekableInputStream");
849851
if(this.analyticsAcceleratorCRTEnabled) {
850-
LOG.info("Using S3CrtClient");
852+
LOG.info("Using S3 CRT client for analytics accelerator S3");
851853
this.s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build();
852854
} else {
853-
LOG.info("Using S3Client");
855+
LOG.info("Using S3 async client for analytics accelerator S3");
854856
this.s3AsyncClient = store.getOrCreateAsyncClient();
855857
}
856858

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class S3ASeekableStream extends FSInputStream implements StreamCapabiliti
3838
private S3SeekableInputStream inputStream;
3939
private long lastReadCurrentPos = 0;
4040
private final String key;
41+
private volatile boolean closed;
4142

4243
public static final Logger LOG = LoggerFactory.getLogger(S3ASeekableStream.class);
4344

@@ -84,7 +85,7 @@ public void seek(long pos) throws IOException {
8485

8586
@Override
8687
public synchronized long getPos() {
87-
if (!isClosed()) {
88+
if (!closed) {
8889
lastReadCurrentPos = inputStream.getPos();
8990
}
9091
return lastReadCurrentPos;
@@ -139,8 +140,9 @@ public int available() throws IOException {
139140
}
140141

141142
@Override
142-
public void close() throws IOException {
143-
if (inputStream != null) {
143+
public synchronized void close() throws IOException {
144+
if(!closed) {
145+
closed = true;
144146
try {
145147
inputStream.close();
146148
inputStream = null;
@@ -174,12 +176,8 @@ private void onReadFailure(IOException ioe) throws IOException {
174176

175177

176178
protected void throwIfClosed() throws IOException {
177-
if (isClosed()) {
179+
if (closed) {
178180
throw new IOException(key + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
179181
}
180182
}
181-
182-
protected boolean isClosed() {
183-
return inputStream == null;
184-
}
185183
}

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,9 @@ protected Configuration createConfiguration() {
8989

9090
@Override
9191
public void testOverwriteExistingFile() throws Throwable {
92-
skipIfAnalyticsAcceleratorEnabled(this.createConfiguration());
92+
// Will remove this when Analytics Accelerator supports overwrites
93+
skipIfAnalyticsAcceleratorEnabled(this.createConfiguration(),
94+
"Analytics Accelerator does not support overwrites yet");
9395
super.testOverwriteExistingFile();
9496
}
9597

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,6 @@ protected Configuration createConfiguration() {
5252
return newConf;
5353
}
5454

55-
@Override
56-
public void setup() throws Exception {
57-
super.setup();
58-
skipIfAnalyticsAcceleratorEnabled(createConfiguration());
59-
}
60-
6155
@Override
6256
protected boolean shouldUseDirectWrite() {
6357
return true;
@@ -85,6 +79,14 @@ public void testNonDirectWrite() throws Exception {
8579
getRenameOperationCount() - renames);
8680
}
8781

82+
@Override
83+
public void testDistCpUpdateCheckFileSkip() throws Exception {
84+
//Will remove this when Analytics Accelerator supports overwrites
85+
skipIfAnalyticsAcceleratorEnabled(createConfiguration(),
86+
"Analytics Accelerator Library does not support update to existing files");
87+
super.testDistCpUpdateCheckFileSkip();
88+
}
89+
8890
private long getRenameOperationCount() {
8991
return getFileSystem().getStorageStatistics()
9092
.getLong(StorageStatistics.CommonStatisticNames.OP_RENAME);

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRename.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ public class ITestS3AContractRename extends AbstractContractRenameTest {
4949
@Override
5050
public void setup() throws Exception {
5151
super.setup();
52-
skipIfAnalyticsAcceleratorEnabled(getContract().getConf());
52+
skipIfAnalyticsAcceleratorEnabled(createConfiguration(),
53+
"Analytics Accelerator does not support rename");
5354

5455
}
5556
@Override

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,13 @@ protected AbstractFSContract createContract(Configuration conf) {
8686
}
8787

8888
/**
89-
* Analytics Accelerator Library for Amazon S3 does not support Vectored Reads
89+
* Analytics Accelerator Library for Amazon S3 does not support Vectored Reads.
9090
* @throws Exception
9191
*/
9292
@Override
9393
public void setup() throws Exception {
94-
skipIfAnalyticsAcceleratorEnabled(createConfiguration());
94+
skipIfAnalyticsAcceleratorEnabled(createConfiguration(),
95+
"Analytics Accelerator does not support vectored reads");
9596
super.setup();
9697
}
9798

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADelayedFNF.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import static org.apache.hadoop.fs.s3a.Constants.RETRY_INTERVAL;
3737
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
3838
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
39+
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
3940

4041
/**
4142
* Tests behavior of a FileNotFound error that happens after open(), i.e. on
@@ -65,6 +66,8 @@ protected Configuration createConfiguration() {
6566
*/
6667
@Test
6768
public void testNotFoundFirstRead() throws Exception {
69+
skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
70+
"Temporarily disabling to fix Exception handling on Analytics Accelerator");
6871
S3AFileSystem fs = getFileSystem();
6972
ChangeDetectionPolicy changeDetectionPolicy =
7073
fs.getChangeDetectionPolicy();

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ protected Configuration createConfiguration() {
9191
@Override
9292
public void setup() throws Exception {
9393
super.setup();
94-
skipIfAnalyticsAcceleratorEnabled(getConfiguration());
94+
skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
95+
"Analytics Accelerator does not support SSEC");
9596
assumeEnabled();
9697
// although not a root dir test, this confuses paths enough it shouldn't be run in
9798
// parallel with other jobs

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,17 +78,19 @@ public void testCopyToLocalWithUseRawLocalFileSystemOption()
7878

7979
@Override
8080
public void testWriteReadAndDeleteOneAndAHalfBlocks() throws Exception {
81-
//Skipping this test for AnalyticsAccelerator as it is acting as an overwrite test
82-
skipIfAnalyticsAcceleratorEnabled(this.contract.getConf());
81+
// Skipping this test for AnalyticsAccelerator as it is acting as an overwrite test
82+
skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(),
83+
"Analytics Accelerator does not support overwrites");
8384
}
8485

8586
@Override
8687
public void testWriteReadAndDeleteTwoBlocks() throws Exception {
87-
//Skipping this test for AnalyticsAccelerator as it is acting as an overwrite test
88-
skipIfAnalyticsAcceleratorEnabled(this.contract.getConf());
88+
// Skipping this test for AnalyticsAccelerator as it is acting as an overwrite test
89+
skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(),
90+
"Analytics Accelerator does not support overwrites");
8991
}
9092

91-
@Override
93+
@Override
9294
public void testOverwrite() throws IOException {
9395
boolean createPerformance = isCreatePerformanceEnabled(fSys);
9496
try {

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,9 @@ public void testOverwrite() throws IOException {
162162

163163
@Override
164164
public void testOverWriteAndRead() throws Exception {
165-
skipIfAnalyticsAcceleratorEnabled(fs.getConf());
165+
//Will remove this when Analytics Accelerator supports overwrites
166+
skipIfAnalyticsAcceleratorEnabled(fs.getConf(),
167+
"Analytics Accelerator does not support overwrites");
166168
super.testOverWriteAndRead();
167169
}
168170
}

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ protected Configuration createConfiguration() {
7878
public void setup() throws Exception {
7979
super.setup();
8080
executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
81-
skipIfAnalyticsAcceleratorEnabled(getContract().getConf());
81+
// TODO: Add IOStatistics Support to S3SeekableStream
82+
skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
83+
"S3SeekableStream does not support IOStatisticsContext");
8284

8385
}
8486

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ public void setup() throws Exception {
9090
@Test
9191
public void testFinalizer() throws Throwable {
9292
Path path = methodPath();
93-
skipIfAnalyticsAcceleratorEnabled(getConfiguration());
93+
// TODO: Add Leak Detection to S3SeekableStream
94+
skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
95+
"S3SeekableStream does not support leak detection");
9496

9597
final S3AFileSystem fs = getFileSystem();
9698

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ public void testMetricsRegister()
5252

5353
@Test
5454
public void testStreamStatistics() throws IOException {
55-
skipIfAnalyticsAcceleratorEnabled(getConfiguration());
55+
// TODO: Add StreamStatistics support to S3SeekableStream
56+
skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
57+
"S3SeekableStream does not support stream statistics");
5658

5759
S3AFileSystem fs = getFileSystem();
5860
Path file = path("testStreamStatistics");

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,7 @@
3737
import org.apache.hadoop.fs.permission.FsAction;
3838
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
3939

40-
import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
41-
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
42-
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
43-
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
44-
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
40+
import static org.apache.hadoop.fs.s3a.Constants.*;
4541
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
4642
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
4743
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
@@ -78,7 +74,6 @@ public void setUp() throws Exception {
7874
super.setup();
7975
// Sets BUFFER_DIR by calling S3ATestUtils#prepareTestConfiguration
8076
conf = createConfiguration();
81-
skipIfAnalyticsAcceleratorEnabled(conf);
8277
testFile = getExternalData(conf);
8378
prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
8479
fs = FileSystem.get(testFile.toUri(), conf);
@@ -99,6 +94,8 @@ public Configuration createConfiguration() {
9994
final String bufferDirBase = configuration.get(BUFFER_DIR);
10095
bufferDir = bufferDirBase + "/" + UUID.randomUUID();
10196
configuration.set(BUFFER_DIR, bufferDir);
97+
// When both Prefetching and Analytics Accelerator enabled Analytics Accelerator is used
98+
conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, false);
10299
return configuration;
103100
}
104101

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,7 @@
3434
import org.apache.hadoop.fs.statistics.IOStatistics;
3535
import org.apache.hadoop.test.LambdaTestUtils;
3636

37-
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
38-
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
39-
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
37+
import static org.apache.hadoop.fs.s3a.Constants.*;
4038
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
4139
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
4240
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
@@ -76,11 +74,6 @@ public ITestS3APrefetchingInputStream() {
7674
private static final int INTERVAL_MILLIS = 500;
7775
private static final int BLOCK_SIZE = S_1K * 10;
7876

79-
@Override
80-
public void setup() throws Exception {
81-
super.setup();
82-
skipIfAnalyticsAcceleratorEnabled(this.createConfiguration());
83-
}
8477

8578
@Override
8679
public Configuration createConfiguration() {
@@ -89,6 +82,8 @@ public Configuration createConfiguration() {
8982
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
9083
conf.setBoolean(PREFETCH_ENABLED_KEY, true);
9184
conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
85+
// When both Prefetching and Analytics Accelerator enabled Analytics Accelerator is used
86+
conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, false);
9287
return conf;
9388
}
9489

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,7 @@
4242
import org.apache.hadoop.fs.statistics.IOStatistics;
4343
import org.apache.hadoop.test.LambdaTestUtils;
4444

45-
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
46-
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
47-
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
48-
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
45+
import static org.apache.hadoop.fs.s3a.Constants.*;
4946
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
5047
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValues;
5148
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
@@ -69,12 +66,6 @@ public static Collection<Object[]> params() {
6966
});
7067
}
7168

72-
@Override
73-
public void setup() throws Exception {
74-
super.setup();
75-
skipIfAnalyticsAcceleratorEnabled(createConfiguration());
76-
}
77-
7869
public ITestS3APrefetchingLruEviction(final String maxBlocks) {
7970
super(true);
8071
this.maxBlocks = maxBlocks;
@@ -100,6 +91,8 @@ public Configuration createConfiguration() {
10091
conf.setBoolean(PREFETCH_ENABLED_KEY, true);
10192
conf.setInt(PREFETCH_MAX_BLOCKS_COUNT, Integer.parseInt(maxBlocks));
10293
conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
94+
// When both Prefetching and Analytics Accelerator enabled Analytics Accelerator is used
95+
conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, false);
10396
return conf;
10497
}
10598

0 commit comments

Comments
 (0)