Skip to content

Commit 63daf56

Browse files
fuatbasikahmarsuhail
authored andcommitted
HADOOP-19348. Add initial support for Analytics Accelerator Library for Amazon S3 (#7192)
1 parent eadf0dd commit 63daf56

33 files changed

+617
-29
lines changed

hadoop-tools/hadoop-aws/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,17 @@
472472
<artifactId>amazon-s3-encryption-client-java</artifactId>
473473
<scope>provided</scope>
474474
</dependency>
475+
<dependency>
476+
<groupId>software.amazon.s3.analyticsaccelerator</groupId>
477+
<artifactId>analyticsaccelerator-s3</artifactId>
478+
<version>0.0.2</version>
479+
<scope>compile</scope>
480+
</dependency>
481+
<dependency>
482+
<groupId>software.amazon.awssdk.crt</groupId>
483+
<artifactId>aws-crt</artifactId>
484+
<version>0.29.10</version>
485+
</dependency>
475486
<dependency>
476487
<groupId>org.assertj</groupId>
477488
<artifactId>assertj-core</artifactId>

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1827,4 +1827,38 @@ private Constants() {
18271827
* Value: {@value}.
18281828
*/
18291829
public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit";
1830+
1831+
1832+
/**
1833+
* Prefix to configure Analytics Accelerator Library.
1834+
*/
1835+
public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX =
1836+
"fs.s3a.analytics.accelerator";
1837+
1838+
/**
1839+
* Config to enable Analytics Accelerator Library for Amazon S3.
1840+
* https://github.com/awslabs/analytics-accelerator-s3
1841+
*/
1842+
public static final String ANALYTICS_ACCELERATOR_ENABLED_KEY =
1843+
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + ".enabled";
1844+
1845+
/**
1846+
* Config to enable usage of crt client with Analytics Accelerator Library.
1847+
* It is by default true.
1848+
*/
1849+
public static final String ANALYTICS_ACCELERATOR_CRT_ENABLED =
1850+
"fs.s3a.analytics.accelerator.crt.client";
1851+
1852+
/**
1853+
* Default value for {@link #ANALYTICS_ACCELERATOR_ENABLED_KEY }
1854+
* Value {@value}.
1855+
*/
1856+
public static final boolean ANALYTICS_ACCELERATOR_ENABLED_DEFAULT = false;
1857+
1858+
/**
1859+
* Default value for {@link #ANALYTICS_ACCELERATOR_CRT_ENABLED }
1860+
* Value {@value}.
1861+
*/
1862+
public static final boolean ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT = true;
1863+
18301864
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@
5252
import javax.annotation.Nullable;
5353

5454
import software.amazon.awssdk.core.exception.SdkException;
55+
import software.amazon.awssdk.services.s3.S3AsyncClient;
5556
import software.amazon.awssdk.services.s3.S3Client;
57+
import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
5658
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
5759
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
5860
import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
@@ -83,6 +85,11 @@
8385
import software.amazon.awssdk.transfer.s3.model.Copy;
8486
import software.amazon.awssdk.transfer.s3.model.CopyRequest;
8587

88+
import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
89+
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
90+
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
91+
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
92+
8693
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
8794
import org.slf4j.Logger;
8895
import org.slf4j.LoggerFactory;
@@ -309,6 +316,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
309316
*/
310317
private S3Client s3Client;
311318

319+
/**
320+
* CRT-Based S3Client created of analytics accelerator library is enabled
321+
* and managed by the S3AStoreImpl. Analytics accelerator library can be
322+
* enabled with {@link Constants#ANALYTICS_ACCELERATOR_ENABLED_KEY}
323+
*/
324+
private S3AsyncClient s3AsyncClient;
325+
312326
// initial callback policy is fail-once; it's there just to assist
313327
// some mock tests and other codepaths trying to call the low level
314328
// APIs on an uninitialized filesystem.
@@ -335,6 +349,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
335349
*/
336350
private ExecutorServiceFuturePool futurePool;
337351

352+
353+
// If true, S3SeekableInputStream from Analytics Accelerator for Amazon S3 will be used.
354+
private boolean analyticsAcceleratorEnabled;
355+
356+
private boolean analyticsAcceleratorCRTEnabled;
357+
338358
private int executorCapacity;
339359
private long multiPartThreshold;
340360
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
@@ -497,6 +517,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
497517
*/
498518
private boolean s3AccessGrantsEnabled;
499519

520+
/**
521+
* Factory to create S3SeekableInputStream if {@link this#analyticsAcceleratorEnabled} is true.
522+
*/
523+
private S3SeekableInputStreamFactory s3SeekableInputStreamFactory;
524+
500525
/** Add any deprecated keys. */
501526
@SuppressWarnings("deprecation")
502527
private static void addDeprecatedKeys() {
@@ -643,8 +668,21 @@ public void initialize(URI name, Configuration originalConf)
643668
dirOperationsPurgeUploads = conf.getBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS,
644669
s3ExpressStore);
645670

671+
672+
this.analyticsAcceleratorEnabled =
673+
conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
674+
this.analyticsAcceleratorCRTEnabled =
675+
conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED,
676+
ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);
677+
646678
this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
647-
DEFAULT_MULTIPART_UPLOAD_ENABLED);
679+
DEFAULT_MULTIPART_UPLOAD_ENABLED);
680+
681+
if(this.analyticsAcceleratorEnabled && !analyticsAcceleratorCRTEnabled) {
682+
// Temp change: Analytics Accelerator with S3AsyncClient do not support Multi-part upload.
683+
this.isMultipartUploadEnabled = false;
684+
}
685+
648686
// multipart copy and upload are the same; this just makes it explicit
649687
this.isMultipartCopyEnabled = isMultipartUploadEnabled;
650688

@@ -771,6 +809,26 @@ public void initialize(URI name, Configuration originalConf)
771809
// thread pool init requires store to be created
772810
initThreadPools();
773811

812+
if (this.analyticsAcceleratorEnabled) {
813+
LOG.info("Using S3SeekableInputStream");
814+
if(this.analyticsAcceleratorCRTEnabled) {
815+
LOG.info("Using S3 CRT client for analytics accelerator S3");
816+
this.s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build();
817+
} else {
818+
LOG.info("Using S3 async client for analytics accelerator S3");
819+
this.s3AsyncClient = store.getOrCreateAsyncClient();
820+
}
821+
822+
ConnectorConfiguration configuration = new ConnectorConfiguration(conf,
823+
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
824+
S3SeekableInputStreamConfiguration seekableInputStreamConfiguration =
825+
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
826+
this.s3SeekableInputStreamFactory =
827+
new S3SeekableInputStreamFactory(
828+
new S3SdkObjectClient(this.s3AsyncClient),
829+
seekableInputStreamConfiguration);
830+
}
831+
774832
// The filesystem is now ready to perform operations against
775833
// S3
776834
// This initiates a probe against S3 for the bucket existing.
@@ -1822,6 +1880,7 @@ private FSDataInputStream executeOpen(
18221880
final Path path,
18231881
final OpenFileSupport.OpenFileInformation fileInformation)
18241882
throws IOException {
1883+
18251884
// create the input stream statistics before opening
18261885
// the file so that the time to prepare to open the file is included.
18271886
S3AInputStreamStatistics inputStreamStats =
@@ -4257,9 +4316,13 @@ public void close() throws IOException {
42574316
protected synchronized void stopAllServices() {
42584317
try {
42594318
trackDuration(getDurationTrackerFactory(), FILESYSTEM_CLOSE.getSymbol(), () -> {
4260-
closeAutocloseables(LOG, getStore());
4319+
4320+
closeAutocloseables(LOG, getStore(), s3SeekableInputStreamFactory);
4321+
42614322
store = null;
42624323
s3Client = null;
4324+
s3AsyncClient = null;
4325+
s3SeekableInputStreamFactory = null;
42634326

42644327
// At this point the S3A client is shut down,
42654328
// now the executor pools are closed
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hadoop.fs.s3a;
21+
22+
import java.io.EOFException;
23+
import java.io.IOException;
24+
25+
import org.apache.hadoop.fs.FSExceptionMessages;
26+
import org.apache.hadoop.fs.StreamCapabilities;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import org.apache.hadoop.fs.FSInputStream;
31+
32+
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
33+
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
34+
import software.amazon.s3.analyticsaccelerator.util.S3URI;
35+
36+
public class S3ASeekableStream extends FSInputStream implements StreamCapabilities {
37+
38+
private S3SeekableInputStream inputStream;
39+
private long lastReadCurrentPos = 0;
40+
private final String key;
41+
private volatile boolean closed;
42+
43+
public static final Logger LOG = LoggerFactory.getLogger(S3ASeekableStream.class);
44+
45+
public S3ASeekableStream(String bucket, String key,
46+
S3SeekableInputStreamFactory s3SeekableInputStreamFactory) {
47+
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key));
48+
this.key = key;
49+
}
50+
51+
/**
52+
* Indicates whether the given {@code capability} is supported by this stream.
53+
*
54+
* @param capability the capability to check.
55+
* @return true if the given {@code capability} is supported by this stream, false otherwise.
56+
*/
57+
@Override
58+
public boolean hasCapability(String capability) {
59+
return false;
60+
}
61+
62+
@Override
63+
public int read() throws IOException {
64+
throwIfClosed();
65+
int bytesRead;
66+
try {
67+
bytesRead = inputStream.read();
68+
} catch (IOException ioe) {
69+
onReadFailure(ioe);
70+
throw ioe;
71+
}
72+
return bytesRead;
73+
}
74+
75+
@Override
76+
public void seek(long pos) throws IOException {
77+
throwIfClosed();
78+
if (pos < 0) {
79+
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
80+
+ " " + pos);
81+
}
82+
inputStream.seek(pos);
83+
}
84+
85+
86+
@Override
87+
public synchronized long getPos() {
88+
if (!closed) {
89+
lastReadCurrentPos = inputStream.getPos();
90+
}
91+
return lastReadCurrentPos;
92+
}
93+
94+
95+
/**
96+
* Reads the last n bytes from the stream into a byte buffer. Blocks until end of stream is
97+
* reached. Leaves the position of the stream unaltered.
98+
*
99+
* @param buf buffer to read data into
100+
* @param off start position in buffer at which data is written
101+
* @param len the number of bytes to read; the n-th byte should be the last byte of the stream.
102+
* @return the total number of bytes read into the buffer
103+
* @throws IOException if an I/O error occurs
104+
*/
105+
public int readTail(byte[] buf, int off, int len) throws IOException {
106+
throwIfClosed();
107+
int bytesRead;
108+
try {
109+
bytesRead = inputStream.readTail(buf, off, len);
110+
} catch (IOException ioe) {
111+
onReadFailure(ioe);
112+
throw ioe;
113+
}
114+
return bytesRead;
115+
}
116+
117+
@Override
118+
public int read(byte[] buf, int off, int len) throws IOException {
119+
throwIfClosed();
120+
int bytesRead;
121+
try {
122+
bytesRead = inputStream.read(buf, off, len);
123+
} catch (IOException ioe) {
124+
onReadFailure(ioe);
125+
throw ioe;
126+
}
127+
return bytesRead;
128+
}
129+
130+
131+
@Override
132+
public boolean seekToNewSource(long l) throws IOException {
133+
return false;
134+
}
135+
136+
@Override
137+
public int available() throws IOException {
138+
throwIfClosed();
139+
return super.available();
140+
}
141+
142+
@Override
143+
public synchronized void close() throws IOException {
144+
if(!closed) {
145+
closed = true;
146+
try {
147+
inputStream.close();
148+
inputStream = null;
149+
super.close();
150+
} catch (IOException ioe) {
151+
LOG.debug("Failure closing stream {}: ", key);
152+
throw ioe;
153+
}
154+
}
155+
}
156+
157+
/**
158+
* Close the stream on read failure.
159+
* No attempt to recover from failure
160+
*
161+
* @param ioe exception caught.
162+
*/
163+
@Retries.OnceTranslated
164+
private void onReadFailure(IOException ioe) throws IOException {
165+
if (LOG.isDebugEnabled()) {
166+
LOG.debug("Got exception while trying to read from stream {}, " +
167+
"not trying to recover:",
168+
key, ioe);
169+
} else {
170+
LOG.info("Got exception while trying to read from stream {}, " +
171+
"not trying to recover:",
172+
key, ioe);
173+
}
174+
this.close();
175+
}
176+
177+
178+
protected void throwIfClosed() throws IOException {
179+
if (closed) {
180+
throw new IOException(key + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
181+
}
182+
}
183+
}

0 commit comments

Comments
 (0)