Skip to content

Commit 382b347

Browse files
HADOOP-19354. S3A: S3AInputStream to be created by factory under S3AStore (#7214)
S3 InputStreams are created by a factory class, with the choice of factory dynamically chosen by the option fs.s3a.input.stream.type Supported values: classic, prefetching, analytics, custom Contributed by Steve Loughran Change-Id: I85a039e798e24a72ee7b4902e4ff08a5d53ffd10
1 parent e76699b commit 382b347

File tree

66 files changed

+3652
-806
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+3652
-806
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ConfigurationHelper.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.HashMap;
2323
import java.util.Locale;
2424
import java.util.Map;
25+
import java.util.function.Function;
2526
import java.util.stream.Collectors;
2627

2728
import org.apache.hadoop.classification.InterfaceAudience;
@@ -123,4 +124,39 @@ public static <E extends Enum<E>> Map<String, E> mapEnumNamesToValues(
123124
return mapping;
124125
}
125126

127+
/**
128+
* Look up an enum from the configuration option and map it to
129+
* a value in the supplied enum class.
130+
* If no value is supplied or there is no match for the supplied value,
131+
* the fallback function is invoked, passing in the trimmed and possibly
132+
* empty string of the value.
133+
* Extends {link {@link Configuration#getEnum(String, Enum)}}
134+
* by adding case independence and a lambda expression for fallback,
135+
* rather than a default value.
136+
* @param conf configuration
137+
* @param name property name
138+
* @param enumClass classname to resolve
139+
* @param fallback fallback supplier
140+
* @param <E> enumeration type.
141+
* @return an enum value
142+
* @throws IllegalArgumentException If mapping is illegal for the type provided
143+
*/
144+
public static <E extends Enum<E>> E resolveEnum(
145+
Configuration conf,
146+
String name,
147+
Class<E> enumClass,
148+
Function<String, E> fallback) {
149+
150+
final String val = conf.getTrimmed(name, "");
151+
152+
// build a map of lower case string to enum values.
153+
final Map<String, E> mapping = mapEnumNamesToValues("", enumClass);
154+
final E mapped = mapping.get(val.toLowerCase(Locale.ROOT));
155+
if (mapped != null) {
156+
return mapped;
157+
} else {
158+
// fallback handles it
159+
return fallback.apply(val);
160+
}
161+
}
126162
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestConfigurationHelper.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import static org.apache.hadoop.util.ConfigurationHelper.ERROR_MULTIPLE_ELEMENTS_MATCHING_TO_LOWER_CASE_VALUE;
3232
import static org.apache.hadoop.util.ConfigurationHelper.mapEnumNamesToValues;
3333
import static org.apache.hadoop.util.ConfigurationHelper.parseEnumSet;
34+
import static org.apache.hadoop.util.ConfigurationHelper.resolveEnum;
3435

3536
/**
3637
* Test for {@link ConfigurationHelper}.
@@ -43,6 +44,12 @@ public class TestConfigurationHelper extends AbstractHadoopTestBase {
4344
*/
4445
private enum SimpleEnum { a, b, c, i }
4546

47+
/**
48+
* Upper case version of SimpleEnum.
49+
* "i" is included for case tests, as it is special in turkey.
50+
*/
51+
private enum UppercaseEnum { A, B, C, I }
52+
4653

4754
/**
4855
* Special case: an enum with no values.
@@ -171,4 +178,65 @@ public void testDuplicateValues() {
171178
.containsExactly(SimpleEnum.a, SimpleEnum.b, SimpleEnum.c);
172179
}
173180

181+
@Test
182+
public void testResolveEnumGood() throws Throwable {
183+
assertEnumResolution("c", SimpleEnum.c);
184+
}
185+
186+
@Test
187+
public void testResolveEnumTrimmed() throws Throwable {
188+
// strings are trimmed at each end
189+
assertEnumResolution("\n i \n ", SimpleEnum.i);
190+
}
191+
192+
@Test
193+
public void testResolveEnumCaseConversion() throws Throwable {
194+
assertEnumResolution("C", SimpleEnum.c);
195+
}
196+
197+
@Test
198+
public void testResolveEnumNoMatch() throws Throwable {
199+
assertEnumResolution("other", null);
200+
}
201+
202+
@Test
203+
public void testResolveEnumEmpty() throws Throwable {
204+
assertEnumResolution("", null);
205+
}
206+
207+
@Test
208+
public void testResolveEnumUpperCaseConversion() throws Throwable {
209+
assertUpperEnumResolution("C", UppercaseEnum.C);
210+
}
211+
212+
@Test
213+
public void testResolveLowerToUpperCaseConversion() throws Throwable {
214+
assertUpperEnumResolution("i", UppercaseEnum.I);
215+
}
216+
217+
/**
218+
* Assert that a string value in a configuration resolves to the expected
219+
* value.
220+
* @param value value to set
221+
* @param expected expected outcome, set to null for no resolution.
222+
*/
223+
private void assertEnumResolution(final String value, final SimpleEnum expected) {
224+
Assertions.assertThat(resolveEnum(confWithKey(value),
225+
"key", SimpleEnum.class, (v) -> null))
226+
.describedAs("Resolution of %s", value)
227+
.isEqualTo(expected);
228+
}
229+
230+
/**
231+
* Equivalent for Uppercase Enum.
232+
* @param value value to set
233+
* @param expected expected outcome, set to null for no resolution.
234+
*/
235+
private void assertUpperEnumResolution(final String value, UppercaseEnum expected) {
236+
Assertions.assertThat(resolveEnum(confWithKey(value),
237+
"key", UppercaseEnum.class, (v) -> null))
238+
.describedAs("Resolution of %s", value)
239+
.isEqualTo(expected);
240+
}
241+
174242
}

hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
</Match>
3131
<!-- we are using completable futures, so ignore the Future which submit() returns -->
3232
<Match>
33-
<Class name="org.apache.hadoop.fs.s3a.S3AFileSystem$InputStreamCallbacksImpl" />
33+
<Class name="org.apache.hadoop.fs.s3a.impl.InputStreamCallbacksImpl" />
3434
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
3535
</Match>
3636

hadoop-tools/hadoop-aws/pom.xml

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@
5454
<!-- marker retention policy -->
5555
<fs.s3a.directory.marker.retention></fs.s3a.directory.marker.retention>
5656

57-
<!-- Is prefetch enabled? -->
58-
<fs.s3a.prefetch.enabled>unset</fs.s3a.prefetch.enabled>
57+
<!-- stream type to use in tests; passed down in fs.s3a.input.stream.type -->
58+
<stream>classic</stream>
5959
<!-- Job ID; allows for parallel jobs on same bucket -->
6060
<!-- job.id is used to build the path for tests; default is 00.-->
6161
<job.id>00</job.id>
@@ -130,8 +130,8 @@
130130
<!-- Markers-->
131131
<fs.s3a.directory.marker.retention>${fs.s3a.directory.marker.retention}</fs.s3a.directory.marker.retention>
132132
<fs.s3a.directory.marker.audit>${fs.s3a.directory.marker.audit}</fs.s3a.directory.marker.audit>
133-
<!-- Prefetch -->
134-
<fs.s3a.prefetch.enabled>${fs.s3a.prefetch.enabled}</fs.s3a.prefetch.enabled>
133+
<!-- Stream Type -->
134+
<fs.s3a.input.stream.type>${stream}</fs.s3a.input.stream.type>
135135
</systemPropertyVariables>
136136
</configuration>
137137
</plugin>
@@ -171,8 +171,8 @@
171171
<fs.s3a.directory.marker.retention>${fs.s3a.directory.marker.retention}</fs.s3a.directory.marker.retention>
172172

173173
<test.default.timeout>${test.integration.timeout}</test.default.timeout>
174-
<!-- Prefetch -->
175-
<fs.s3a.prefetch.enabled>${fs.s3a.prefetch.enabled}</fs.s3a.prefetch.enabled>
174+
<!-- Stream Type -->
175+
<fs.s3a.input.stream.type>${stream}</fs.s3a.input.stream.type>
176176
<!-- are root tests enabled. Set to false when running parallel jobs on same bucket -->
177177
<fs.s3a.root.tests.enabled>${root.tests.enabled}</fs.s3a.root.tests.enabled>
178178

@@ -225,8 +225,8 @@
225225
<!-- Markers-->
226226
<fs.s3a.directory.marker.retention>${fs.s3a.directory.marker.retention}</fs.s3a.directory.marker.retention>
227227
<fs.s3a.directory.marker.audit>${fs.s3a.directory.marker.audit}</fs.s3a.directory.marker.audit>
228-
<!-- Prefetch -->
229-
<fs.s3a.prefetch.enabled>${fs.s3a.prefetch.enabled}</fs.s3a.prefetch.enabled>
228+
<!-- Stream Type -->
229+
<fs.s3a.input.stream.type>${stream}</fs.s3a.input.stream.type>
230230
<!-- are root tests enabled. Set to false when running parallel jobs on same bucket -->
231231
<fs.s3a.root.tests.enabled>${root.tests.enabled}</fs.s3a.root.tests.enabled>
232232
<test.unique.fork.id>job-${job.id}</test.unique.fork.id>
@@ -289,8 +289,8 @@
289289
<!-- Markers-->
290290
<fs.s3a.directory.marker.retention>${fs.s3a.directory.marker.retention}</fs.s3a.directory.marker.retention>
291291
<fs.s3a.directory.marker.audit>${fs.s3a.directory.marker.audit}</fs.s3a.directory.marker.audit>
292-
<!-- Prefetch -->
293-
<fs.s3a.prefetch.enabled>${fs.s3a.prefetch.enabled}</fs.s3a.prefetch.enabled>
292+
<!-- Stream Type -->
293+
<fs.s3a.input.stream.type>${stream}</fs.s3a.input.stream.type>
294294
<test.unique.fork.id>job-${job.id}</test.unique.fork.id>
295295
</systemPropertyVariables>
296296
<forkedProcessTimeoutInSeconds>${fs.s3a.scale.test.timeout}</forkedProcessTimeoutInSeconds>
@@ -362,7 +362,20 @@
362362
</property>
363363
</activation>
364364
<properties>
365-
<fs.s3a.prefetch.enabled>true</fs.s3a.prefetch.enabled>
365+
<stream>prefetch</stream>
366+
</properties>
367+
</profile>
368+
369+
<!-- Switch to the analytics input stream-->
370+
<profile>
371+
<id>analytics</id>
372+
<activation>
373+
<property>
374+
<name>analytics</name>
375+
</property>
376+
</activation>
377+
<properties>
378+
<stream>analytics</stream>
366379
</properties>
367380
</profile>
368381

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.hadoop.classification.InterfaceAudience;
2222
import org.apache.hadoop.classification.InterfaceStability;
2323
import org.apache.hadoop.fs.Options;
24+
import org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration;
2425
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
2526

2627
import java.time.Duration;
@@ -1563,14 +1564,60 @@ private Constants() {
15631564
*/
15641565
public static final String AWS_AUTH_CLASS_PREFIX = "com.amazonaws.auth";
15651566

1567+
/**
1568+
* Input stream type: {@value}.
1569+
*/
1570+
public static final String INPUT_STREAM_TYPE = "fs.s3a.input.stream.type";
1571+
1572+
/**
1573+
* The classic input stream: {@value}.
1574+
*/
1575+
public static final String INPUT_STREAM_TYPE_CLASSIC =
1576+
StreamIntegration.CLASSIC;
1577+
1578+
/**
1579+
* The prefetching input stream: {@value}.
1580+
*/
1581+
public static final String INPUT_STREAM_TYPE_PREFETCH = StreamIntegration.PREFETCH;
1582+
1583+
/**
1584+
* The analytics input stream: {@value}.
1585+
*/
1586+
public static final String INPUT_STREAM_TYPE_ANALYTICS =
1587+
StreamIntegration.ANALYTICS;
1588+
1589+
/**
1590+
* Request the default input stream,
1591+
* whatever it is for this release: {@value}.
1592+
*/
1593+
public static final String INPUT_STREAM_TYPE_DEFAULT = StreamIntegration.DEFAULT;
1594+
1595+
/**
1596+
* The custom input stream type: {@value}".
1597+
* If set, the classname is loaded from
1598+
* {@link #INPUT_STREAM_CUSTOM_FACTORY}.
1599+
* <p>
1600+
* This option is primarily for testing as it can
1601+
* be used to generated failures.
1602+
*/
1603+
public static final String INPUT_STREAM_TYPE_CUSTOM =
1604+
StreamIntegration.CUSTOM;
1605+
1606+
/**
1607+
* Classname of the factory to instantiate for custom streams: {@value}.
1608+
*/
1609+
public static final String INPUT_STREAM_CUSTOM_FACTORY = "fs.s3a.input.stream.custom.factory";
1610+
15661611
/**
15671612
* Controls whether the prefetching input stream is enabled.
15681613
*/
1614+
@Deprecated
15691615
public static final String PREFETCH_ENABLED_KEY = "fs.s3a.prefetch.enabled";
15701616

15711617
/**
15721618
* Default option as to whether the prefetching input stream is enabled.
15731619
*/
1620+
@Deprecated
15741621
public static final boolean PREFETCH_ENABLED_DEFAULT = false;
15751622

15761623
// If the default values are used, each file opened for reading will consume

0 commit comments

Comments
 (0)