Skip to content

Commit 3fff158

Browse files
committed
HDFS-11114. Support for running async disk checks in DataNode.
This closes #153.
1 parent 3dbad5d commit 3fff158

File tree

5 files changed

+638
-0
lines changed

5 files changed

+638
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs.server.datanode.checker;
20+
21+
import com.google.common.util.concurrent.ListenableFuture;
22+
import org.apache.hadoop.classification.InterfaceAudience;
23+
import org.apache.hadoop.classification.InterfaceStability;
24+
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.TimeUnit;
27+
28+
/**
29+
* A class that can be used to schedule an asynchronous check on a given
30+
* {@link Checkable}. If the check is successfully scheduled then a
31+
* {@link ListenableFuture} is returned.
32+
*
33+
*/
34+
@InterfaceAudience.Private
35+
@InterfaceStability.Unstable
36+
public interface AsyncChecker<K, V> {
37+
38+
/**
39+
* Schedule an asynchronous check for the given object.
40+
*
41+
* @param target object to be checked.
42+
*
43+
* @param context the interpretation of the context depends on the
44+
* target.
45+
*
46+
* @return returns a {@link ListenableFuture} that can be used to
47+
* retrieve the result of the asynchronous check.
48+
*/
49+
ListenableFuture<V> schedule(Checkable<K, V> target, K context);
50+
51+
/**
52+
* Cancel all executing checks and wait for them to complete.
53+
* First attempts a graceful cancellation, then cancels forcefully.
54+
* Waits for the supplied timeout after both attempts.
55+
*
56+
* See {@link ExecutorService#awaitTermination} for a description of
57+
* the parameters.
58+
*
59+
* @throws InterruptedException
60+
*/
61+
void shutdownAndWait(long timeout, TimeUnit timeUnit)
62+
throws InterruptedException;
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs.server.datanode.checker;
20+
21+
import org.apache.hadoop.classification.InterfaceAudience;
22+
import org.apache.hadoop.classification.InterfaceStability;
23+
24+
25+
/**
26+
* A Checkable is an object whose health can be probed by invoking its
27+
* {@link #check} method.
28+
*
29+
* e.g. a {@link Checkable} instance may represent a single hardware
30+
* resource.
31+
*/
32+
@InterfaceAudience.Private
33+
@InterfaceStability.Unstable
34+
public interface Checkable<K, V> {
35+
36+
/**
37+
* Query the health of this object. This method may hang
38+
* indefinitely depending on the status of the target resource.
39+
*
40+
* @param context for the probe operation. May be null depending
41+
* on the implementation.
42+
*
43+
* @return result of the check operation.
44+
*
45+
* @throws Exception encountered during the check operation. An
46+
* exception indicates that the check failed.
47+
*/
48+
V check(K context) throws Exception;
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hdfs.server.datanode.checker;
20+
21+
import com.google.common.util.concurrent.FutureCallback;
22+
import com.google.common.util.concurrent.Futures;
23+
import com.google.common.util.concurrent.ListenableFuture;
24+
import com.google.common.util.concurrent.ListeningExecutorService;
25+
import com.google.common.util.concurrent.MoreExecutors;
26+
import org.apache.hadoop.classification.InterfaceAudience;
27+
import org.apache.hadoop.classification.InterfaceStability;
28+
import org.apache.hadoop.util.Timer;
29+
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import javax.annotation.Nonnull;
34+
import javax.annotation.Nullable;
35+
import java.util.HashMap;
36+
import java.util.Map;
37+
import java.util.WeakHashMap;
38+
import java.util.concurrent.Callable;
39+
import java.util.concurrent.ExecutorService;
40+
import java.util.concurrent.TimeUnit;
41+
42+
/**
43+
* An implementation of {@link AsyncChecker} that skips checking recently
44+
* checked objects. It will enforce at least {@link minMsBetweenChecks}
45+
* milliseconds between two successive checks of any one object.
46+
*
47+
* It is assumed that the total number of Checkable objects in the system
48+
* is small, (not more than a few dozen) since the checker uses O(Checkables)
49+
* storage and also potentially O(Checkables) threads.
50+
*
51+
* {@link minMsBetweenChecks} should be configured reasonably
52+
* by the caller to avoid spinning up too many threads frequently.
53+
*/
54+
@InterfaceAudience.Private
55+
@InterfaceStability.Unstable
56+
public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
57+
public static final Logger LOG =
58+
LoggerFactory.getLogger(ThrottledAsyncChecker.class);
59+
60+
private final Timer timer;
61+
62+
/**
63+
* The ExecutorService used to schedule asynchronous checks.
64+
*/
65+
private final ListeningExecutorService executorService;
66+
67+
/**
68+
* The minimum gap in milliseconds between two successive checks
69+
* of the same object. This is the throttle.
70+
*/
71+
private final long minMsBetweenChecks;
72+
73+
/**
74+
* Map of checks that are currently in progress. Protected by the object
75+
* lock.
76+
*/
77+
private final Map<Checkable, ListenableFuture<V>> checksInProgress;
78+
79+
/**
80+
* Maps Checkable objects to a future that can be used to retrieve
81+
* the results of the operation.
82+
* Protected by the object lock.
83+
*/
84+
private final Map<Checkable, LastCheckResult<V>> completedChecks;
85+
86+
ThrottledAsyncChecker(final Timer timer,
87+
final long minMsBetweenChecks,
88+
final ExecutorService executorService) {
89+
this.timer = timer;
90+
this.minMsBetweenChecks = minMsBetweenChecks;
91+
this.executorService = MoreExecutors.listeningDecorator(executorService);
92+
this.checksInProgress = new HashMap<>();
93+
this.completedChecks = new WeakHashMap<>();
94+
}
95+
96+
/**
97+
* See {@link AsyncChecker#schedule}
98+
*
99+
* If the object has been checked recently then the check will
100+
* be skipped. Multiple concurrent checks for the same object
101+
* will receive the same Future.
102+
*/
103+
@Override
104+
public synchronized ListenableFuture<V> schedule(
105+
final Checkable<K, V> target,
106+
final K context) {
107+
LOG.debug("Scheduling a check of {}", target);
108+
109+
if (checksInProgress.containsKey(target)) {
110+
return checksInProgress.get(target);
111+
}
112+
113+
if (completedChecks.containsKey(target)) {
114+
final LastCheckResult<V> result = completedChecks.get(target);
115+
final long msSinceLastCheck = timer.monotonicNow() - result.completedAt;
116+
if (msSinceLastCheck < minMsBetweenChecks) {
117+
LOG.debug("Skipped checking {}. Time since last check {}ms " +
118+
"is less than the min gap {}ms.",
119+
target, msSinceLastCheck, minMsBetweenChecks);
120+
return result.result != null ?
121+
Futures.immediateFuture(result.result) :
122+
Futures.immediateFailedFuture(result.exception);
123+
}
124+
}
125+
126+
final ListenableFuture<V> lf = executorService.submit(
127+
new Callable<V>() {
128+
@Override
129+
public V call() throws Exception {
130+
return target.check(context);
131+
}
132+
});
133+
checksInProgress.put(target, lf);
134+
addResultCachingCallback(target, lf);
135+
return lf;
136+
}
137+
138+
/**
139+
* Register a callback to cache the result of a check.
140+
* @param target
141+
* @param lf
142+
*/
143+
private void addResultCachingCallback(
144+
Checkable<K, V> target, ListenableFuture<V> lf) {
145+
Futures.addCallback(lf, new FutureCallback<V>() {
146+
@Override
147+
public void onSuccess(@Nullable V result) {
148+
synchronized (ThrottledAsyncChecker.this) {
149+
checksInProgress.remove(target);
150+
completedChecks.put(target, new LastCheckResult<>(
151+
result, timer.monotonicNow()));
152+
}
153+
}
154+
155+
@Override
156+
public void onFailure(@Nonnull Throwable t) {
157+
synchronized (ThrottledAsyncChecker.this) {
158+
checksInProgress.remove(target);
159+
completedChecks.put(target, new LastCheckResult<>(
160+
t, timer.monotonicNow()));
161+
}
162+
}
163+
});
164+
}
165+
166+
/**
167+
* {@inheritDoc}.
168+
*/
169+
@Override
170+
public void shutdownAndWait(long timeout, TimeUnit timeUnit)
171+
throws InterruptedException {
172+
// Try orderly shutdown.
173+
executorService.shutdown();
174+
175+
if (!executorService.awaitTermination(timeout, timeUnit)) {
176+
// Interrupt executing tasks and wait again.
177+
executorService.shutdownNow();
178+
executorService.awaitTermination(timeout, timeUnit);
179+
}
180+
}
181+
182+
/**
183+
* Status of running a check. It can either be a result or an
184+
* exception, depending on whether the check completed or threw.
185+
*/
186+
private static final class LastCheckResult<V> {
187+
/**
188+
* Timestamp at which the check completed.
189+
*/
190+
private final long completedAt;
191+
192+
/**
193+
* Result of running the check if it completed. null if it threw.
194+
*/
195+
@Nullable
196+
private final V result;
197+
198+
/**
199+
* Exception thrown by the check. null if it returned a result.
200+
*/
201+
private final Throwable exception; // null on success.
202+
203+
/**
204+
* Initialize with a result.
205+
* @param result
206+
*/
207+
private LastCheckResult(V result, long completedAt) {
208+
this.result = result;
209+
this.exception = null;
210+
this.completedAt = completedAt;
211+
}
212+
213+
/**
214+
* Initialize with an exception.
215+
* @param completedAt
216+
* @param t
217+
*/
218+
private LastCheckResult(Throwable t, long completedAt) {
219+
this.result = null;
220+
this.exception = t;
221+
this.completedAt = completedAt;
222+
}
223+
}
224+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
/**
20+
* Datanode support for running disk checks.
21+
*/
22+
@InterfaceAudience.LimitedPrivate({"HDFS"})
23+
@InterfaceStability.Evolving
24+
package org.apache.hadoop.hdfs.server.datanode.checker;
25+
import org.apache.hadoop.classification.InterfaceAudience;
26+
import org.apache.hadoop.classification.InterfaceStability;

0 commit comments

Comments
 (0)