Skip to content

Commit 8862575

Browse files
author
Jian Zhang
committed
Merge branch 'HDFS-17531' into HDFS-17552
2 parents 3a08429 + 78204d9 commit 8862575

File tree

15 files changed

+2299
-0
lines changed

15 files changed

+2299
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router.async;
19+
20+
import java.io.IOException;
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.Executor;
23+
24+
import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
25+
26+
/**
27+
* Represents a function that accepts a value of type T and produces a result of type R.
28+
* This interface extends {@link Async} and provides methods to apply the function
29+
* asynchronously using {@link CompletableFuture}.
30+
*
31+
* <p>ApplyFunction is used to implement the following semantics:</p>
32+
* <pre>
33+
* {@code
34+
* T res = doAsync(input);
35+
* // Can use ApplyFunction
36+
* R result = thenApply(res);
37+
* }
38+
* </pre>
39+
*
40+
* @param <T> the type of the input to the function
41+
* @param <R> the type of the result of the function
42+
*/
43+
@FunctionalInterface
44+
public interface ApplyFunction<T, R> extends Async<R>{
45+
46+
/**
47+
* Applies this function to the given argument.
48+
*
49+
* @param t the function argument
50+
* @return the function result
51+
* @throws IOException if an I/O error occurs
52+
*/
53+
R apply(T t) throws IOException;
54+
55+
/**
56+
* Applies this function asynchronously to the result of the given {@link CompletableFuture}.
57+
* The function is executed on the same thread as the completion of the given future.
58+
*
59+
* @param in the input future
60+
* @return a new future that holds the result of the function application
61+
*/
62+
default CompletableFuture<R> apply(CompletableFuture<T> in) {
63+
return in.thenApply(t -> {
64+
try {
65+
return ApplyFunction.this.apply(t);
66+
} catch (IOException e) {
67+
throw warpCompletionException(e);
68+
}
69+
});
70+
}
71+
72+
/**
73+
* Applies this function asynchronously to the result of the given {@link CompletableFuture},
74+
* using the specified executor for the asynchronous computation.
75+
*
76+
* @param in the input future
77+
* @param executor the executor to use for the asynchronous computation
78+
* @return a new future that holds the result of the function application
79+
*/
80+
default CompletableFuture<R> apply(CompletableFuture<T> in, Executor executor) {
81+
return in.thenApplyAsync(t -> {
82+
try {
83+
return ApplyFunction.this.apply(t);
84+
} catch (IOException e) {
85+
throw warpCompletionException(e);
86+
}
87+
}, executor);
88+
}
89+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router.async;
19+
20+
import java.io.IOException;
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.ExecutionException;
23+
import java.util.concurrent.CompletionException;
24+
25+
/**
26+
* An interface for asynchronous operations, providing utility methods
27+
* and constants related to asynchronous computations.
28+
*
29+
* @param <R> The type of the result of the asynchronous operation
30+
*/
31+
public interface Async<R> {
32+
33+
/**
34+
* A thread-local variable to store the {@link CompletableFuture} instance for the current thread.
35+
* <p>
36+
* <b>Note:</b> After executing an asynchronous method, the thread stores the CompletableFuture
37+
* of the asynchronous method in the thread's local variable
38+
*/
39+
ThreadLocal<CompletableFuture<Object>> CUR_COMPLETABLE_FUTURE
40+
= new ThreadLocal<>();
41+
42+
/**
43+
* Sets the {@link CompletableFuture} instance for the current thread.
44+
*
45+
* @param completableFuture The {@link CompletableFuture} instance to be set
46+
* @param <T> The type of the result in the CompletableFuture
47+
*/
48+
default <T> void setCurCompletableFuture(CompletableFuture<T> completableFuture) {
49+
CUR_COMPLETABLE_FUTURE.set((CompletableFuture<Object>) completableFuture);
50+
}
51+
52+
/**
53+
* Gets the {@link CompletableFuture} instance for the current thread.
54+
*
55+
* @return The {@link CompletableFuture} instance for the current thread,
56+
* or {@code null} if not set
57+
*/
58+
default CompletableFuture<R> getCurCompletableFuture() {
59+
return (CompletableFuture<R>) CUR_COMPLETABLE_FUTURE.get();
60+
}
61+
62+
/**
63+
* Blocks and retrieves the result of the {@link CompletableFuture} instance
64+
* for the current thread.
65+
*
66+
* @return The result of the CompletableFuture, or {@code null} if the thread was interrupted
67+
* @throws IOException If the completion exception to the CompletableFuture
68+
* is an IOException or a subclass of it
69+
*/
70+
default R result() throws IOException {
71+
try {
72+
CompletableFuture<R> completableFuture =
73+
(CompletableFuture<R>) CUR_COMPLETABLE_FUTURE.get();
74+
assert completableFuture != null;
75+
return completableFuture.get();
76+
} catch (InterruptedException e) {
77+
return null;
78+
} catch (ExecutionException e) {
79+
Throwable cause = e.getCause();
80+
if (cause instanceof IOException) {
81+
throw (IOException)cause;
82+
}
83+
throw new IOException(e);
84+
}
85+
}
86+
87+
/**
88+
* Extracts the real cause of an exception wrapped by CompletionException.
89+
*
90+
* @param e The incoming exception, which may be a CompletionException.
91+
* @return Returns the real cause of the original exception,
92+
* or the original exception if there is no cause.
93+
*/
94+
static Throwable unWarpCompletionException(Throwable e) {
95+
if (e instanceof CompletionException) {
96+
if (e.getCause() != null) {
97+
return e.getCause();
98+
}
99+
}
100+
return e;
101+
}
102+
103+
/**
104+
* Wraps the incoming exception in a new CompletionException.
105+
*
106+
* @param e The incoming exception, which may be any type of Throwable.
107+
* @return Returns a new CompletionException with the original exception as its cause.
108+
*/
109+
static CompletionException warpCompletionException(Throwable e) {
110+
if (e instanceof CompletionException) {
111+
return (CompletionException) e;
112+
}
113+
return new CompletionException(e);
114+
}
115+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router.async;
19+
20+
import java.io.IOException;
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.Executor;
23+
24+
import static org.apache.hadoop.hdfs.server.federation.router.async.Async.warpCompletionException;
25+
26+
/**
27+
* The AsyncApplyFunction interface represents a function that
28+
* asynchronously accepts a value of type T and produces a result
29+
* of type R. This interface extends {@link ApplyFunction} and is
30+
* designed to be used with asynchronous computation frameworks,
31+
* such as Java's {@link java.util.concurrent.CompletableFuture}.
32+
*
33+
* <p>An implementation of this interface is expected to perform an
34+
* asynchronous operation and return a result, which is typically
35+
* represented as a {@code CompletableFuture<R>}. This allows for
36+
* non-blocking execution of tasks and is particularly useful for
37+
* I/O operations or any operation that may take a significant amount
38+
* of time to complete.</p>
39+
*
40+
* <p>AsyncApplyFunction is used to implement the following semantics:</p>
41+
* <pre>
42+
* {@code
43+
* T res = doAsync1(input);
44+
* // Can use AsyncApplyFunction
45+
* R result = doAsync2(res);
46+
* }
47+
* </pre>
48+
*
49+
* @param <T> the type of the input to the function
50+
* @param <R> the type of the result of the function
51+
* @see ApplyFunction
52+
* @see java.util.concurrent.CompletableFuture
53+
*/
54+
@FunctionalInterface
55+
public interface AsyncApplyFunction<T, R> extends ApplyFunction<T, R> {
56+
57+
/**
58+
* Asynchronously applies this function to the given argument.
59+
*
60+
* <p>This method is intended to initiate the function application
61+
* without waiting for the result. It is typically used when the
62+
* result of the operation is not required immediately or when the
63+
* operation is part of a larger asynchronous workflow.</p>
64+
*
65+
* @param t the function argument
66+
* @throws IOException if an I/O error occurs during the application
67+
* of the function
68+
*/
69+
void applyAsync(T t) throws IOException;
70+
71+
/**
72+
* Synchronously applies this function to the given argument and
73+
* returns the result.
74+
*
75+
* <p>This method waits for the asynchronous operation to complete
76+
* and returns its result. It is useful when the result is needed
77+
* immediately and the calling code cannot proceed without it.</p>
78+
*
79+
* @param t the function argument
80+
* @return the result of applying the function to the argument
81+
* @throws IOException if an I/O error occurs during the application
82+
* of the function
83+
*/
84+
@Override
85+
default R apply(T t) throws IOException {
86+
applyAsync(t);
87+
return result();
88+
}
89+
90+
/**
91+
* Initiates the asynchronous application of this function to the given result.
92+
* <p>
93+
* This method calls applyAsync to start the asynchronous operation and then retrieves
94+
* the current thread's CompletableFuture using getCurCompletableFuture.
95+
* It returns this CompletableFuture, which will be completed with the result of the
96+
* asynchronous operation once it is finished.
97+
* <p>
98+
* This method is useful for chaining with other asynchronous operations, as it allows the
99+
* current operation to be part of a larger asynchronous workflow.
100+
*
101+
* @param t the function argument
102+
* @return a CompletableFuture that will be completed with the result of the
103+
* asynchronous operation
104+
* @throws IOException if an I/O error occurs during the initiation of the asynchronous operation
105+
*/
106+
default CompletableFuture<R> async(T t) throws IOException {
107+
applyAsync(t);
108+
CompletableFuture<R> completableFuture = getCurCompletableFuture();
109+
assert completableFuture != null;
110+
return completableFuture;
111+
}
112+
113+
/**
114+
* Asynchronously applies this function to the result of the given
115+
* CompletableFuture.
116+
*
117+
* <p>This method chains the function application to the completion
118+
* of the input future. It returns a new CompletableFuture that
119+
* completes with the function's result when the input future
120+
* completes.</p>
121+
*
122+
* @param in the input future
123+
* @return a new CompletableFuture that holds the result of the
124+
* function application
125+
*/
126+
@Override
127+
default CompletableFuture<R> apply(CompletableFuture<T> in) {
128+
return in.thenCompose(t -> {
129+
try {
130+
return async(t);
131+
} catch (IOException e) {
132+
throw warpCompletionException(e);
133+
}
134+
});
135+
}
136+
137+
/**
138+
* Asynchronously applies this function to the result of the given
139+
* CompletableFuture, using the specified executor for the
140+
* asynchronous computation.
141+
*
142+
* <p>This method allows for more control over the execution
143+
* context of the asynchronous operation, such as running the
144+
* operation in a separate thread or thread pool.</p>
145+
*
146+
* @param in the input future
147+
* @param executor the executor to use for the asynchronous
148+
* computation
149+
* @return a new CompletableFuture that holds the result of the
150+
* function application
151+
*/
152+
@Override
153+
default CompletableFuture<R> apply(CompletableFuture<T> in, Executor executor) {
154+
return in.thenComposeAsync(t -> {
155+
try {
156+
return async(t);
157+
} catch (IOException e) {
158+
throw warpCompletionException(e);
159+
}
160+
}, executor);
161+
}
162+
}

0 commit comments

Comments
 (0)