Skip to content

Commit e56882e

Browse files
committed
Add exception handling. Fix Multi-part uploads
1 parent e38b482 commit e56882e

File tree

5 files changed

+72
-19
lines changed

5 files changed

+72
-19
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1776,7 +1776,7 @@ private Constants() {
17761776
/**
17771777
* Config to specify usage of crt client with Analytics Accelerator Library for Amazon S3 and it is by default true
17781778
*/
1779-
public static final String USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR = "fs.s3a.analytics.accelerator.crt.client";
1779+
public static final String ANALYTICS_ACCELERATOR_CRT_ENABLED = "fs.s3a.analytics.accelerator.crt.client";
17801780

17811781
/**
17821782
* Default value for {@link #ANALYTICS_ACCELERATOR_ENABLED_KEY }
@@ -1785,9 +1785,9 @@ private Constants() {
17851785
public static final boolean ANALYTICS_ACCELERATOR_ENABLED_DEFAULT = false;
17861786

17871787
/**
1788-
* Default value for {@link #USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR }
1788+
* Default value for {@link #ANALYTICS_ACCELERATOR_CRT_ENABLED }
17891789
* Value {@value}.
17901790
*/
1791-
public static final boolean USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR_DEFAULT = true;
1791+
public static final boolean ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT = true;
17921792

17931793
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
361361
// If true, S3SeekableInputStream from Analytics Accelerator for Amazon S3 will be used.
362362
private boolean analyticsAcceleratorEnabled;
363363

364+
private boolean analyticsAcceleratorCRTEnabled;
365+
364366
// Size in bytes of a single prefetch block.
365367
private int prefetchBlockSize;
366368

@@ -704,13 +706,16 @@ public void initialize(URI name, Configuration originalConf)
704706
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);
705707

706708
this.analyticsAcceleratorEnabled = conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
709+
this.analyticsAcceleratorCRTEnabled = conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);
707710

708-
if(!analyticsAcceleratorEnabled) {
709-
this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
710-
DEFAULT_MULTIPART_UPLOAD_ENABLED);
711-
} else {
711+
this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
712+
DEFAULT_MULTIPART_UPLOAD_ENABLED);
713+
714+
if(this.analyticsAcceleratorEnabled && !analyticsAcceleratorCRTEnabled) {
715+
// Temp change: Analytics Accelerator with S3AsyncClient do not support Multi-part upload.
712716
this.isMultipartUploadEnabled = false;
713717
}
718+
714719
// multipart copy and upload are the same; this just makes it explicit
715720
this.isMultipartCopyEnabled = isMultipartUploadEnabled;
716721

@@ -841,7 +846,7 @@ public void initialize(URI name, Configuration originalConf)
841846

842847
if (this.analyticsAcceleratorEnabled) {
843848
LOG.info("Using S3SeekableInputStream");
844-
if(conf.getBoolean(USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR, USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR_DEFAULT)) {
849+
if(this.analyticsAcceleratorCRTEnabled) {
845850
LOG.info("Using S3CrtClient");
846851
this.s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build();
847852
} else {

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,14 @@ public boolean hasCapability(String capability) {
6060
@Override
6161
public int read() throws IOException {
6262
throwIfClosed();
63-
return inputStream.read();
63+
int bytesRead;
64+
try {
65+
bytesRead = inputStream.read();
66+
} catch (IOException ioe){
67+
onReadFailure(ioe);
68+
throw ioe;
69+
}
70+
return bytesRead;
6471
}
6572

6673
@Override
@@ -89,18 +96,32 @@ public synchronized long getPos() {
8996
*
9097
* @param buf buffer to read data into
9198
* @param off start position in buffer at which data is written
92-
* @param n the number of bytes to read; the n-th byte should be the last byte of the stream.
99+
* @param len the number of bytes to read; the n-th byte should be the last byte of the stream.
93100
* @return the total number of bytes read into the buffer
94101
*/
95-
public void readTail(byte[] buf, int off, int n) throws IOException {
102+
public int readTail(byte[] buf, int off, int len) throws IOException {
96103
throwIfClosed();
97-
inputStream.readTail(buf, off, n);
104+
int bytesRead;
105+
try {
106+
bytesRead = inputStream.readTail(buf, off, len);
107+
} catch (IOException ioe) {
108+
onReadFailure(ioe);
109+
throw ioe;
110+
}
111+
return bytesRead;
98112
}
99113

100114
@Override
101115
public int read(byte[] buf, int off, int len) throws IOException {
102116
throwIfClosed();
103-
return inputStream.read(buf, off, len);
117+
int bytesRead;
118+
try {
119+
bytesRead = inputStream.read(buf, off, len);
120+
} catch (IOException ioe) {
121+
onReadFailure(ioe);
122+
throw ioe;
123+
}
124+
return bytesRead;
104125
}
105126

106127

@@ -118,12 +139,37 @@ public int available() throws IOException {
118139
@Override
119140
public void close() throws IOException {
120141
if (inputStream != null) {
121-
inputStream.close();
122-
inputStream = null;
123-
super.close();
142+
try {
143+
inputStream.close();
144+
inputStream = null;
145+
super.close();
146+
} catch (IOException ioe) {
147+
LOG.debug("Failure closing stream {}: ", key);
148+
throw ioe;
149+
}
124150
}
125151
}
126152

153+
/**
154+
* Close the stream on read failure.
155+
* No attempt to recover from failure
156+
* @param ioe exception caught.
157+
*/
158+
@Retries.OnceTranslated
159+
private void onReadFailure(IOException ioe) throws IOException {
160+
if (LOG.isDebugEnabled()) {
161+
LOG.debug("Got exception while trying to read from stream {}, " +
162+
"not trying to recover:",
163+
key, ioe);
164+
} else {
165+
LOG.info("Got exception while trying to read from stream {}, " +
166+
"not trying to recover:",
167+
key, ioe);
168+
}
169+
this.close();
170+
}
171+
172+
127173
protected void throwIfClosed() throws IOException {
128174
if (isClosed()) {
129175
throw new IOException(key + ": " + FSExceptionMessages.STREAM_IS_CLOSED);

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
3232
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_ENABLED_KEY;
33-
import static org.apache.hadoop.fs.s3a.Constants.USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR;
33+
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CRT_ENABLED;
3434
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
3535

3636
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
@@ -48,7 +48,7 @@ public void testConnectorFrameWorkIntegration(boolean useCrtClient) throws IOExc
4848
Configuration conf = getConfiguration();
4949
removeBaseAndBucketOverrides(conf, ANALYTICS_ACCELERATOR_ENABLED_KEY);
5050
conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, true);
51-
conf.setBoolean(USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR, useCrtClient);
51+
conf.setBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, useCrtClient);
5252

5353
String testFile = "s3a://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz";
5454
S3AFileSystem s3AFileSystem = (S3AFileSystem) FileSystem.newInstance(new Path(testFile).toUri(), conf);
@@ -83,7 +83,7 @@ public void testConnectorFrameworkConfigurable(boolean useCrtClient) {
8383
//Set Blobstore Capacity
8484
conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1);
8585

86-
conf.setBoolean(USE_CRT_CLIENT_WITH_S3A_ANALYTICS_ACCELERATOR, useCrtClient);
86+
conf.setBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, useCrtClient);
8787

8888
ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
8989

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.hadoop.security.UserGroupInformation;
4848

4949
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
50+
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
5051
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
5152
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.COMMITTER_NAME_STAGING;
5253
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@@ -182,6 +183,7 @@ public void setup() throws Exception {
182183
// destroy all filesystems from previous runs.
183184
FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
184185
super.setup();
186+
skipIfAnalyticsAcceleratorEnabled(createConfiguration());
185187
jobId = randomJobId();
186188
attempt0 = "attempt_" + jobId + "_m_000000_0";
187189
taskAttempt0 = TaskAttemptID.forName(attempt0);

0 commit comments

Comments
 (0)