Skip to content

Commit 0f01d61

Browse files
committed
HADOOP-19354. stream factories
Moves all prefetching stream related options into the prefetching stream factory; the standard ReadOpContext removes them, so a new PrefetchingOptions is passed around. Stream factories can now declare how many extra shared threads they want and whether or not to create a future pool around the bounded pool. This is used in S3AFileSystem when creating its thread pools -this class no longer reads in any of the prefetching options. All tests which enable/disable prefetching, or probe for its state, now use S3ATestUtils methods for this. This avoids them having to now explicitly unset two properties, set the new input stream type, and any more complications in test setup in future. Everything under S3AStore is a service, so service lifecycle matches everywhere -and store just adds to the list of managed services for start/stop/close integration. + adjust assertions in ITestS3AInputStreamLeakage for prefetching + update the prefetching.md doc for factory changs + javadocs + add string values of type names to Constants Once the analytics stream is in, a full doc on "stream performance" will be needed. package for this stuff is now impl.streams Change-Id: Id6356d2ded2c477ba16cbb9027ac0cfbece2a542
1 parent f034480 commit 0f01d61

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+698
-368
lines changed

hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
</Match>
3131
<!-- we are using completable futures, so ignore the Future which submit() returns -->
3232
<Match>
33-
<Class name="org.apache.hadoop.fs.s3a.S3AFileSystem$InputStreamCallbacksImpl" />
33+
<Class name="org.apache.hadoop.fs.s3a.impl.InputStreamCallbacksImpl" />
3434
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
3535
</Match>
3636

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1561,9 +1561,31 @@ private Constants() {
15611561
public static final String AWS_AUTH_CLASS_PREFIX = "com.amazonaws.auth";
15621562

15631563

1564+
/**
1565+
* Input stream type: {@value}.
1566+
*/
15641567
public static final String INPUT_STREAM_TYPE = "fs.s3a.input.stream.type";
15651568

1566-
public static final String INPUT_STREAM_TYPE_DEFAULT = "classic";
1569+
/**
1570+
* The classic input stream: {@value}.
1571+
*/
1572+
public static final String INPUT_STREAM_TYPE_CLASSIC = "classic";
1573+
1574+
/**
1575+
* The prefetching input stream: {@value}.
1576+
*/
1577+
public static final String INPUT_STREAM_TYPE_PREFETCH = "prefetch";
1578+
1579+
/**
1580+
* The analytics input stream: {@value}.
1581+
*/
1582+
public static final String INPUT_STREAM_TYPE_ANALYTICS = "analytics";
1583+
1584+
/**
1585+
* The default input stream.
1586+
* Currently {@link #INPUT_STREAM_TYPE_CLASSIC}
1587+
*/
1588+
public static final String INPUT_STREAM_TYPE_DEFAULT = INPUT_STREAM_TYPE_CLASSIC;
15671589

15681590
/**
15691591
* Controls whether the prefetching input stream is enabled.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InputStreamType.java

Lines changed: 0 additions & 29 deletions
This file was deleted.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 22 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,11 @@
5252
import java.util.concurrent.atomic.AtomicBoolean;
5353
import javax.annotation.Nullable;
5454

55-
import software.amazon.awssdk.core.ResponseInputStream;
5655
import software.amazon.awssdk.core.exception.SdkException;
5756
import software.amazon.awssdk.services.s3.S3Client;
5857
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
5958
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
6059
import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
61-
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
62-
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
6360
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
6461
import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
6562
import software.amazon.awssdk.services.s3.model.MultipartUpload;
@@ -152,11 +149,9 @@
152149
import org.apache.hadoop.fs.s3a.impl.StoreContextFactory;
153150
import org.apache.hadoop.fs.s3a.impl.UploadContentProviders;
154151
import org.apache.hadoop.fs.s3a.impl.CSEUtils;
155-
import org.apache.hadoop.fs.s3a.prefetch.PrefetchingInputStreamFactory;
156-
import org.apache.hadoop.fs.s3a.impl.ClassicObjectInputStreamFactory;
157-
import org.apache.hadoop.fs.s3a.impl.model.ObjectReadParameters;
158-
import org.apache.hadoop.fs.s3a.impl.model.ObjectInputStreamFactory;
159-
import org.apache.hadoop.fs.s3a.impl.model.ObjectInputStreamCallbacks;
152+
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamFactory;
153+
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
154+
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamCallbacks;
160155
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
161156
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
162157
import org.apache.hadoop.fs.statistics.DurationTracker;
@@ -172,7 +167,6 @@
172167
import org.apache.hadoop.fs.store.audit.ActiveThreadSpanSource;
173168
import org.apache.hadoop.fs.store.audit.AuditSpan;
174169
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
175-
import org.apache.hadoop.io.IOUtils;
176170
import org.apache.hadoop.io.Text;
177171
import org.apache.hadoop.security.AccessControlException;
178172
import org.apache.hadoop.security.token.DelegationTokenIssuer;
@@ -341,18 +335,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
341335
private ExecutorService boundedThreadPool;
342336
private ThreadPoolExecutor unboundedThreadPool;
343337

344-
// S3 reads are prefetched asynchronously using this future pool.
338+
/**
339+
* Future pool built on the bounded thread pool.
340+
*/
345341
private ExecutorServiceFuturePool futurePool;
346342

347-
// If true, the prefetching input stream is used for reads.
348-
private boolean prefetchEnabled;
349-
350-
// Size in bytes of a single prefetch block.
351-
private int prefetchBlockSize;
352-
353-
// Size of prefetch queue (in number of blocks).
354-
private int prefetchBlockCount;
355-
356343
private int executorCapacity;
357344
private long multiPartThreshold;
358345
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
@@ -673,22 +660,11 @@ public void initialize(URI name, Configuration originalConf)
673660
dirOperationsPurgeUploads = conf.getBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS,
674661
s3ExpressStore);
675662

676-
this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
677-
long prefetchBlockSizeLong =
678-
longBytesOption(conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE, 1);
679-
if (prefetchBlockSizeLong > (long) Integer.MAX_VALUE) {
680-
throw new IOException("S3A prefatch block size exceeds int limit");
681-
}
682-
this.prefetchBlockSize = (int) prefetchBlockSizeLong;
683-
this.prefetchBlockCount =
684-
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);
685663
this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
686664
DEFAULT_MULTIPART_UPLOAD_ENABLED);
687665
// multipart copy and upload are the same; this just makes it explicit
688666
this.isMultipartCopyEnabled = isMultipartUploadEnabled;
689667

690-
initThreadPools(conf);
691-
692668
int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
693669
if (listVersion < 1 || listVersion > 2) {
694670
LOG.warn("Configured fs.s3a.list.version {} is invalid, forcing " +
@@ -811,6 +787,10 @@ public void initialize(URI name, Configuration originalConf)
811787
// directly through the client manager.
812788
// this is to aid mocking.
813789
s3Client = getStore().getOrCreateS3Client();
790+
791+
// thread pool init requires store to be created
792+
initThreadPools();
793+
814794
// The filesystem is now ready to perform operations against
815795
// S3
816796
// This initiates a probe against S3 for the bucket existing.
@@ -959,12 +939,15 @@ public Statistics getInstanceStatistics() {
959939
}
960940

961941
/**
962-
* Initialize the thread pool.
942+
* Initialize the thread pools.
963943
* This must be re-invoked after replacing the S3Client during test
964944
* runs.
965945
* @param conf configuration.
966946
*/
967-
private void initThreadPools(Configuration conf) {
947+
private void initThreadPools() {
948+
949+
Configuration conf = getConf();
950+
968951
final String name = "s3a-transfer-" + getBucket();
969952
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
970953
if (maxThreads < 2) {
@@ -980,7 +963,9 @@ private void initThreadPools(Configuration conf) {
980963
TimeUnit.SECONDS,
981964
Duration.ZERO).getSeconds();
982965

983-
int numPrefetchThreads = this.prefetchEnabled ? this.prefetchBlockCount : 0;
966+
final ObjectInputStreamFactory.ThreadOptions requirements =
967+
getStore().prefetchThreadRequirements();
968+
int numPrefetchThreads = requirements.sharedThreads();
984969

985970
int activeTasksForBoundedThreadPool = maxThreads;
986971
int waitingTasksForBoundedThreadPool = maxThreads + totalTasks + numPrefetchThreads;
@@ -998,7 +983,7 @@ private void initThreadPools(Configuration conf) {
998983
unboundedThreadPool.allowCoreThreadTimeOut(true);
999984
executorCapacity = intOption(conf,
1000985
EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1);
1001-
if (prefetchEnabled) {
986+
if (requirements.createFuturePool()) {
1002987
final S3AInputStreamStatistics s3AInputStreamStatistics =
1003988
statisticsContext.newInputStreamStatistics();
1004989
futurePool = new ExecutorServiceFuturePool(
@@ -1987,9 +1972,8 @@ protected S3AReadOpContext createReadContext(
19871972
fileStatus,
19881973
vectoredIOContext,
19891974
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator(),
1990-
futurePool,
1991-
prefetchBlockSize,
1992-
prefetchBlockCount)
1975+
futurePool
1976+
)
19931977
.withAuditSpan(auditSpan);
19941978
openFileHelper.applyDefaultOptions(roc);
19951979
return roc.build();
@@ -5519,7 +5503,7 @@ public boolean hasPathCapability(final Path path, final String capability)
55195503

55205504
// stream leak detection.
55215505
case StreamStatisticNames.STREAM_LEAKS:
5522-
return !prefetchEnabled;
5506+
return true;
55235507

55245508
default:
55255509
// is it a performance flag?

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,8 @@
3939
import org.slf4j.LoggerFactory;
4040

4141
import org.apache.hadoop.fs.impl.LeakReporter;
42-
import org.apache.hadoop.fs.s3a.impl.model.ObjectInputStream;
43-
import org.apache.hadoop.fs.s3a.impl.model.ObjectReadParameters;
44-
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
42+
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
43+
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
4544
import org.apache.hadoop.util.Preconditions;
4645
import org.apache.hadoop.classification.InterfaceAudience;
4746
import org.apache.hadoop.classification.InterfaceStability;
@@ -191,7 +190,7 @@ public S3AInputStream(ObjectReadParameters parameters) {
191190
S3AReadOpContext context = getContext();
192191
this.changeTracker = new ChangeTracker(getUri(),
193192
context.getChangeDetectionPolicy(),
194-
getStreamStatistics().getChangeTrackerStatistics(),
193+
getS3AStreamStatistics().getChangeTrackerStatistics(),
195194
getObjectAttributes());
196195
setReadahead(context.getReadahead());
197196
this.asyncDrainThreshold = context.getAsyncDrainThreshold();
@@ -873,7 +872,8 @@ public synchronized void readVectored(List<? extends FileRange> ranges,
873872

874873
if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
875874
LOG.debug("Not merging the ranges as they are disjoint");
876-
getS3AStreamStatistics().readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size());
875+
getS3AStreamStatistics().readVectoredOperationStarted(sortedRanges.size(),
876+
sortedRanges.size());
877877
for (FileRange range: sortedRanges) {
878878
ByteBuffer buffer = allocate.apply(range.getLength());
879879
getBoundedThreadPool().submit(() -> readSingleRange(range, buffer));
@@ -883,7 +883,8 @@ public synchronized void readVectored(List<? extends FileRange> ranges,
883883
List<CombinedFileRange> combinedFileRanges = mergeSortedRanges(sortedRanges,
884884
1, minSeekForVectorReads(),
885885
maxReadSizeForVectorReads());
886-
getS3AStreamStatistics().readVectoredOperationStarted(sortedRanges.size(), combinedFileRanges.size());
886+
getS3AStreamStatistics().readVectoredOperationStarted(sortedRanges.size(),
887+
combinedFileRanges.size());
887888
LOG.debug("Number of original ranges size {} , Number of combined ranges {} ",
888889
ranges.size(), combinedFileRanges.size());
889890
for (CombinedFileRange combinedFileRange: combinedFileRanges) {
@@ -1289,9 +1290,9 @@ public synchronized void unbuffer() {
12891290
@Override
12901291
public boolean hasCapability(String capability) {
12911292
switch (toLowerCase(capability)) {
1292-
case StreamCapabilities.IOSTATISTICS:
12931293
case StreamCapabilities.IOSTATISTICS_CONTEXT:
1294-
case StreamStatisticNames.STREAM_LEAKS:
1294+
case StreamCapabilities.READAHEAD:
1295+
case StreamCapabilities.UNBUFFER:
12951296
return true;
12961297
default:
12971298
return super.hasCapability(capability);

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java

Lines changed: 6 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,11 @@ public class S3AReadOpContext extends S3AOpContext {
7575
/** Thread-level IOStatistics aggregator. **/
7676
private final IOStatisticsAggregator ioStatisticsAggregator;
7777

78-
// S3 reads are prefetched asynchronously using this future pool.
78+
/**
79+
* Pool for any future IO.
80+
*/
7981
private ExecutorServiceFuturePool futurePool;
8082

81-
// Size in bytes of a single prefetch block.
82-
private final int prefetchBlockSize;
83-
84-
// Size of prefetch queue (in number of blocks).
85-
private final int prefetchBlockCount;
86-
8783
/**
8884
* Instantiate.
8985
* @param path path of read
@@ -93,9 +89,7 @@ public class S3AReadOpContext extends S3AOpContext {
9389
* @param dstFileStatus target file status
9490
* @param vectoredIOContext context for vectored read operation.
9591
* @param ioStatisticsAggregator IOStatistics aggregator for each thread.
96-
* @param futurePool the ExecutorServiceFuturePool instance used by async prefetches.
97-
* @param prefetchBlockSize the size (in number of bytes) of each prefetched block.
98-
* @param prefetchBlockCount maximum number of prefetched blocks.
92+
* @param futurePool Pool for any future IO
9993
*/
10094
public S3AReadOpContext(
10195
final Path path,
@@ -105,22 +99,15 @@ public S3AReadOpContext(
10599
FileStatus dstFileStatus,
106100
VectoredIOContext vectoredIOContext,
107101
IOStatisticsAggregator ioStatisticsAggregator,
108-
ExecutorServiceFuturePool futurePool,
109-
int prefetchBlockSize,
110-
int prefetchBlockCount) {
102+
ExecutorServiceFuturePool futurePool) {
111103

112104
super(invoker, stats, instrumentation,
113105
dstFileStatus);
114106
this.path = requireNonNull(path);
115107
this.vectoredIOContext = requireNonNull(vectoredIOContext, "vectoredIOContext");
116108
this.ioStatisticsAggregator = ioStatisticsAggregator;
117109
this.futurePool = futurePool;
118-
Preconditions.checkArgument(
119-
prefetchBlockSize > 0, "invalid prefetchBlockSize %d", prefetchBlockSize);
120-
this.prefetchBlockSize = prefetchBlockSize;
121-
Preconditions.checkArgument(
122-
prefetchBlockCount > 0, "invalid prefetchBlockCount %d", prefetchBlockCount);
123-
this.prefetchBlockCount = prefetchBlockCount;
110+
124111
}
125112

126113
/**
@@ -265,23 +252,6 @@ public ExecutorServiceFuturePool getFuturePool() {
265252
return this.futurePool;
266253
}
267254

268-
/**
269-
* Gets the size in bytes of a single prefetch block.
270-
*
271-
* @return the size in bytes of a single prefetch block.
272-
*/
273-
public int getPrefetchBlockSize() {
274-
return this.prefetchBlockSize;
275-
}
276-
277-
/**
278-
* Gets the size of prefetch queue (in number of blocks).
279-
*
280-
* @return the size of prefetch queue (in number of blocks).
281-
*/
282-
public int getPrefetchBlockCount() {
283-
return this.prefetchBlockCount;
284-
}
285255

286256
@Override
287257
public String toString() {

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,14 @@
4646
import org.apache.hadoop.classification.InterfaceAudience;
4747
import org.apache.hadoop.classification.InterfaceStability;
4848
import org.apache.hadoop.conf.Configuration;
49-
import org.apache.hadoop.fs.FSDataInputStream;
5049
import org.apache.hadoop.fs.LocalDirAllocator;
5150
import org.apache.hadoop.fs.s3a.api.RequestFactory;
5251
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
5352
import org.apache.hadoop.fs.s3a.impl.ClientManager;
5453
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
5554
import org.apache.hadoop.fs.s3a.impl.S3AFileSystemOperations;
5655
import org.apache.hadoop.fs.s3a.impl.StoreContext;
57-
import org.apache.hadoop.fs.s3a.impl.model.ObjectInputStreamFactory;
58-
import org.apache.hadoop.fs.s3a.impl.model.ObjectReadParameters;
56+
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamFactory;
5957
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
6058
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
6159
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
@@ -76,7 +74,11 @@
7674
*/
7775
@InterfaceAudience.LimitedPrivate("Extensions")
7876
@InterfaceStability.Unstable
79-
public interface S3AStore extends Service, IOStatisticsSource, ClientManager, ObjectInputStreamFactory {
77+
public interface S3AStore extends
78+
ClientManager,
79+
IOStatisticsSource,
80+
ObjectInputStreamFactory,
81+
Service {
8082

8183
/**
8284
* Acquire write capacity for operations.

0 commit comments

Comments
 (0)