Skip to content

Commit f034480

Browse files
ahmarsuhailsteveloughran
authored andcommitted
initial commit
Change-Id: If42bdd0b227c4da07c62a410a998e6d8c35581f6
1 parent 82dbe03 commit f034480

File tree

6 files changed

+168
-77
lines changed

6 files changed

+168
-77
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1560,6 +1560,11 @@ private Constants() {
15601560
*/
15611561
public static final String AWS_AUTH_CLASS_PREFIX = "com.amazonaws.auth";
15621562

1563+
1564+
public static final String INPUT_STREAM_TYPE = "fs.s3a.input.stream.type";
1565+
1566+
public static final String INPUT_STREAM_TYPE_DEFAULT = "classic";
1567+
15631568
/**
15641569
* Controls whether the prefetching input stream is enabled.
15651570
*/
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.apache.hadoop.fs.s3a;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
public enum InputStreamType {
7+
CLASSIC("classic"),
8+
PREFETCH("prefetch"),
9+
ANALYTICS("analytics");
10+
11+
private final String name;
12+
13+
private static final Logger LOG = LoggerFactory.getLogger(InputStreamType.class);
14+
15+
InputStreamType(String name) {
16+
this.name = name;
17+
}
18+
19+
public static InputStreamType fromString(String inputStreamType) {
20+
for (InputStreamType value : values()) {
21+
if (value.name.equalsIgnoreCase(inputStreamType)) {
22+
return value;
23+
}
24+
}
25+
LOG.warn("Unknown input stream type {}, using default classic stream.", inputStreamType);
26+
27+
return CLASSIC;
28+
}
29+
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 4 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@
126126
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
127127
import org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation;
128128
import org.apache.hadoop.fs.s3a.impl.CreateFileBuilder;
129+
import org.apache.hadoop.fs.s3a.impl.InputStreamCallbacksImpl;
129130
import org.apache.hadoop.fs.s3a.impl.S3AFileSystemOperations;
130131
import org.apache.hadoop.fs.s3a.impl.CSEV1CompatibleS3AFileSystemOperations;
131132
import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
@@ -1884,6 +1885,7 @@ private FSDataInputStream executeOpen(
18841885
auditSpan);
18851886
fileInformation.applyOptions(readContext);
18861887
LOG.debug("Opening '{}'", readContext);
1888+
// QUESTION: why are we creating a new executor on each open?
18871889
final SemaphoredDelegatingExecutor pool = new SemaphoredDelegatingExecutor(
18881890
boundedThreadPool,
18891891
vectoredActiveRangeReads,
@@ -1901,21 +1903,7 @@ private FSDataInputStream executeOpen(
19011903
// TODO: move into S3AStore and export the factory API through
19021904
// the store, which will add some of the features (callbacks, stats)
19031905
// before invoking the real factory
1904-
ObjectInputStreamFactory factory = null;
1905-
try {
1906-
// Choose factory.
1907-
if (prefetchEnabled) {
1908-
factory = new PrefetchingInputStreamFactory();
1909-
} else {
1910-
factory = new ClassicObjectInputStreamFactory();
1911-
}
1912-
factory.init(getConf());
1913-
factory.start();
1914-
return new FSDataInputStream(factory.readObject(parameters));
1915-
} finally {
1916-
IOUtils.cleanupWithLogger(LOG, factory);
1917-
}
1918-
1906+
return new FSDataInputStream(getStore().readObject(parameters));
19191907
}
19201908

19211909
/**
@@ -1924,68 +1912,9 @@ private FSDataInputStream executeOpen(
19241912
*/
19251913
private ObjectInputStreamCallbacks createInputStreamCallbacks(
19261914
final AuditSpan auditSpan) {
1927-
return new InputStreamCallbacksImpl(auditSpan);
1915+
return new InputStreamCallbacksImpl(auditSpan, getStore(), fsHandler, unboundedThreadPool);
19281916
}
19291917

1930-
/**
1931-
* Operations needed by ObjectInputStreams to read data.
1932-
*/
1933-
private final class InputStreamCallbacksImpl implements
1934-
ObjectInputStreamCallbacks {
1935-
1936-
/**
1937-
* Audit span to activate before each call.
1938-
*/
1939-
private final AuditSpan auditSpan;
1940-
1941-
/**
1942-
* Create.
1943-
* @param auditSpan Audit span to activate before each call.
1944-
*/
1945-
private InputStreamCallbacksImpl(final AuditSpan auditSpan) {
1946-
this.auditSpan = requireNonNull(auditSpan);
1947-
}
1948-
1949-
/**
1950-
* Closes the audit span.
1951-
*/
1952-
@Override
1953-
public void close() {
1954-
auditSpan.close();
1955-
}
1956-
1957-
@Override
1958-
public GetObjectRequest.Builder newGetRequestBuilder(final String key) {
1959-
// active the audit span used for the operation
1960-
try (AuditSpan span = auditSpan.activate()) {
1961-
return getRequestFactory().newGetObjectRequestBuilder(key);
1962-
}
1963-
}
1964-
1965-
@Override
1966-
public ResponseInputStream<GetObjectResponse> getObject(GetObjectRequest request) throws
1967-
IOException {
1968-
// active the audit span used for the operation
1969-
try (AuditSpan span = auditSpan.activate()) {
1970-
return fsHandler.getObject(getStore(), request, getRequestFactory());
1971-
}
1972-
}
1973-
1974-
@Override
1975-
public <T> CompletableFuture<T> submit(final CallableRaisingIOE<T> operation) {
1976-
CompletableFuture<T> result = new CompletableFuture<>();
1977-
unboundedThreadPool.submit(() ->
1978-
LambdaUtils.eval(result, () -> {
1979-
LOG.debug("Starting submitted operation in {}", auditSpan.getSpanId());
1980-
try (AuditSpan span = auditSpan.activate()) {
1981-
return operation.apply();
1982-
} finally {
1983-
LOG.debug("Completed submitted operation in {}", auditSpan.getSpanId());
1984-
}
1985-
}));
1986-
return result;
1987-
}
1988-
}
19891918

19901919
/**
19911920
* Callbacks for WriteOperationHelper.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,16 @@
4646
import org.apache.hadoop.classification.InterfaceAudience;
4747
import org.apache.hadoop.classification.InterfaceStability;
4848
import org.apache.hadoop.conf.Configuration;
49+
import org.apache.hadoop.fs.FSDataInputStream;
4950
import org.apache.hadoop.fs.LocalDirAllocator;
5051
import org.apache.hadoop.fs.s3a.api.RequestFactory;
5152
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
5253
import org.apache.hadoop.fs.s3a.impl.ClientManager;
5354
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
5455
import org.apache.hadoop.fs.s3a.impl.S3AFileSystemOperations;
5556
import org.apache.hadoop.fs.s3a.impl.StoreContext;
57+
import org.apache.hadoop.fs.s3a.impl.model.ObjectInputStreamFactory;
58+
import org.apache.hadoop.fs.s3a.impl.model.ObjectReadParameters;
5659
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
5760
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
5861
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
@@ -73,7 +76,7 @@
7376
*/
7477
@InterfaceAudience.LimitedPrivate("Extensions")
7578
@InterfaceStability.Unstable
76-
public interface S3AStore extends Service, IOStatisticsSource, ClientManager {
79+
public interface S3AStore extends Service, IOStatisticsSource, ClientManager, ObjectInputStreamFactory {
7780

7881
/**
7982
* Acquire write capacity for operations.
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package org.apache.hadoop.fs.s3a.impl;
2+
3+
import java.io.IOException;
4+
import java.util.concurrent.CompletableFuture;
5+
import java.util.concurrent.ThreadPoolExecutor;
6+
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
import software.amazon.awssdk.core.ResponseInputStream;
10+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
11+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
12+
13+
import org.apache.hadoop.fs.s3a.S3AStore;
14+
import org.apache.hadoop.fs.s3a.impl.model.ObjectInputStreamCallbacks;
15+
import org.apache.hadoop.fs.store.audit.AuditSpan;
16+
import org.apache.hadoop.util.LambdaUtils;
17+
import org.apache.hadoop.util.functional.CallableRaisingIOE;
18+
19+
import static java.util.Objects.requireNonNull;
20+
21+
public class InputStreamCallbacksImpl implements ObjectInputStreamCallbacks {
22+
23+
/**
24+
* Audit span to activate before each call.
25+
*/
26+
private final AuditSpan auditSpan;
27+
28+
private final S3AStore store;
29+
30+
private final S3AFileSystemOperations fsHandler;
31+
32+
private static final Logger LOG = LoggerFactory.getLogger(InputStreamCallbacksImpl.class);
33+
34+
private final ThreadPoolExecutor unboundedThreadPool;
35+
36+
/**
37+
* Create.
38+
* @param auditSpan Audit span to activate before each call.
39+
*/
40+
public InputStreamCallbacksImpl(final AuditSpan auditSpan, final S3AStore store,
41+
S3AFileSystemOperations fsHandler, ThreadPoolExecutor unboundedThreadPool) {
42+
this.auditSpan = requireNonNull(auditSpan);
43+
this.store = requireNonNull(store);
44+
this.fsHandler = requireNonNull(fsHandler);
45+
this.unboundedThreadPool = requireNonNull(unboundedThreadPool);
46+
}
47+
48+
/**
49+
* Closes the audit span.
50+
*/
51+
@Override
52+
public void close() {
53+
auditSpan.close();
54+
}
55+
56+
@Override
57+
public GetObjectRequest.Builder newGetRequestBuilder(final String key) {
58+
// active the audit span used for the operation
59+
try (AuditSpan span = auditSpan.activate()) {
60+
return store.getRequestFactory().newGetObjectRequestBuilder(key);
61+
}
62+
}
63+
64+
@Override
65+
public ResponseInputStream<GetObjectResponse> getObject(GetObjectRequest request) throws
66+
IOException {
67+
// active the audit span used for the operation
68+
try (AuditSpan span = auditSpan.activate()) {
69+
return fsHandler.getObject(store, request, store.getRequestFactory());
70+
}
71+
}
72+
73+
@Override
74+
public <T> CompletableFuture<T> submit(final CallableRaisingIOE<T> operation) {
75+
CompletableFuture<T> result = new CompletableFuture<>();
76+
unboundedThreadPool.submit(() ->
77+
LambdaUtils.eval(result, () -> {
78+
LOG.debug("Starting submitted operation in {}", auditSpan.getSpanId());
79+
try (AuditSpan span = auditSpan.activate()) {
80+
return operation.apply();
81+
} finally {
82+
LOG.debug("Completed submitted operation in {}", auditSpan.getSpanId());
83+
}
84+
}));
85+
return result;
86+
}
87+
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,11 @@
5858
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
5959

6060
import org.apache.hadoop.conf.Configuration;
61+
import org.apache.hadoop.fs.FSDataInputStream;
6162
import org.apache.hadoop.fs.FileSystem;
6263
import org.apache.hadoop.fs.LocalDirAllocator;
6364
import org.apache.hadoop.fs.Path;
65+
import org.apache.hadoop.fs.s3a.InputStreamType;
6466
import org.apache.hadoop.fs.s3a.Invoker;
6567
import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
6668
import org.apache.hadoop.fs.s3a.Retries;
@@ -72,6 +74,10 @@
7274
import org.apache.hadoop.fs.s3a.UploadInfo;
7375
import org.apache.hadoop.fs.s3a.api.RequestFactory;
7476
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
77+
import org.apache.hadoop.fs.s3a.impl.model.ObjectInputStream;
78+
import org.apache.hadoop.fs.s3a.impl.model.ObjectInputStreamFactory;
79+
import org.apache.hadoop.fs.s3a.impl.model.ObjectReadParameters;
80+
import org.apache.hadoop.fs.s3a.prefetch.PrefetchingInputStreamFactory;
7581
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
7682
import org.apache.hadoop.fs.statistics.DurationTracker;
7783
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
@@ -85,6 +91,8 @@
8591
import static java.util.Objects.requireNonNull;
8692
import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
8793
import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR;
94+
import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_TYPE;
95+
import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_TYPE_DEFAULT;
8896
import static org.apache.hadoop.fs.s3a.S3AUtils.extractException;
8997
import static org.apache.hadoop.fs.s3a.S3AUtils.getPutRequestLength;
9098
import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException;
@@ -108,6 +116,7 @@
108116
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
109117
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
110118
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier;
119+
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
111120
import static org.apache.hadoop.util.Preconditions.checkArgument;
112121

113122
/**
@@ -116,7 +125,7 @@
116125
* to move.
117126
*/
118127
public class S3AStoreImpl extends CompositeService
119-
implements S3AStore {
128+
implements S3AStore, ObjectInputStreamFactory {
120129

121130
private static final Logger LOG = LoggerFactory.getLogger(S3AStoreImpl.class);
122131

@@ -177,6 +186,8 @@ public class S3AStoreImpl extends CompositeService
177186
*/
178187
private LocalDirAllocator directoryAllocator;
179188

189+
private ObjectInputStreamFactory objectInputStreamFactory;
190+
180191
/** Constructor to create S3A store. */
181192
S3AStoreImpl(StoreContextFactory storeContextFactory,
182193
ClientManager clientManager,
@@ -209,12 +220,14 @@ public class S3AStoreImpl extends CompositeService
209220
protected void serviceStart() throws Exception {
210221
super.serviceStart();
211222
initLocalDirAllocator();
223+
initInputStreamFactory();
212224
}
213225

214226
@Override
215227
protected void serviceStop() throws Exception {
216228
super.serviceStop();
217229
clientManager.close();
230+
cleanupWithLogger(LOG, objectInputStreamFactory);
218231
}
219232

220233
/**
@@ -226,6 +239,20 @@ private void initLocalDirAllocator() {
226239
directoryAllocator = new LocalDirAllocator(bufferDir);
227240
}
228241

242+
private void initInputStreamFactory() {
243+
InputStreamType inputStreamType = InputStreamType.fromString(getConfig().get(INPUT_STREAM_TYPE, INPUT_STREAM_TYPE_DEFAULT));
244+
switch (inputStreamType) {
245+
case PREFETCH:
246+
this.objectInputStreamFactory = new PrefetchingInputStreamFactory();
247+
break;
248+
default:
249+
this.objectInputStreamFactory = new ClassicObjectInputStreamFactory();
250+
}
251+
252+
this.objectInputStreamFactory.init(getConfig());
253+
this.objectInputStreamFactory.start();
254+
}
255+
229256
/** Acquire write capacity for rate limiting {@inheritDoc}. */
230257
@Override
231258
public Duration acquireWriteCapacity(final int capacity) {
@@ -871,4 +898,15 @@ public File createTemporaryFileForWriting(String pathStr,
871898
return File.createTempFile(prefix, null, dir);
872899
}
873900

901+
@Override
902+
public ObjectInputStream readObject(ObjectReadParameters parameters)
903+
throws IOException {
904+
if (objectInputStreamFactory != null) {
905+
return objectInputStreamFactory.readObject(parameters);
906+
} else {
907+
// TODO: Find the right exception to throw if factory has not yet been initialised, or closed.
908+
throw new IOException("Factory not initialized!");
909+
}
910+
}
911+
874912
}

0 commit comments

Comments
 (0)