Skip to content

HDFS-17352. Add configuration to control whether DN delete this replica from disk when client requests a missing block #6559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_DEFAULT_MS =
600000;

public static final String DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_ENABLE =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be handy if this could be configured dynamically.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tomscut for your review.
I wll support dynamically configured later.

"dfs.datanode.delete.corrupt.replica.from.disk.enable";
public static final boolean DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_DEFAULT = true;

public static final String DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT =
"dfs.namenode.path.based.cache.block.map.allocation.percent";
public static final float DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_ENABLE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY;
Expand Down Expand Up @@ -374,7 +376,8 @@ public class DataNode extends ReconfigurableBase
DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY,
DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY,
DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY,
DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY));
DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY,
DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_ENABLE));

public static final String METRICS_LOG_NAME = "DataNodeMetricsLog";

Expand Down Expand Up @@ -740,6 +743,8 @@ public String reconfigurePropertyImpl(String property, String newVal)
return reconfDiskBalancerParameters(property, newVal);
case DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY:
return reconfSlowIoWarningThresholdParameters(property, newVal);
case DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_ENABLE:
return reconfDeleteCorruptReplicaFromDiskParameter(property, newVal);
default:
break;
}
Expand Down Expand Up @@ -1079,6 +1084,26 @@ private String reconfSlowIoWarningThresholdParameters(String property, String ne
}
}

private String reconfDeleteCorruptReplicaFromDiskParameter(String property, String newVal)
throws ReconfigurationException {
String result;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
if (newVal != null && !newVal.equalsIgnoreCase("true")
&& !newVal.equalsIgnoreCase("false")) {
throw new IllegalArgumentException("Not a valid Boolean value for " + property);
}
boolean enable = (newVal == null ? DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_DEFAULT :
Boolean.parseBoolean(newVal));
data.setDeleteCorruptReplicaFromDisk(enable);
result = Boolean.toString(enable);
LOG.info("RECONFIGURE* changed {} to {}", property, result);
return result;
} catch (IllegalArgumentException e) {
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
}
}

/**
* Get a list of the keys of the re-configurable properties in configuration.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,4 +698,19 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
* @param time the last time in milliseconds when the directory scanner successfully ran.
*/
default void setLastDirScannerFinishTime(long time) {}

/**
* Set to enable or disable the deletion of corrupt replicas from disk
* when client requests a missing block.
* @param deleteCorruptReplicaFromDisk
* true will delete the actual on-disk block and meta file,
* otherwise false will only remove block from volume map.
*/
void setDeleteCorruptReplicaFromDisk(boolean deleteCorruptReplicaFromDisk);

/**
* Check whether the datanode delete replica from disk when a client requests a missing block.
* @return boolean.
*/
boolean isDeleteCorruptReplicaFromDisk();
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_ENABLE;

/**************************************************
* FSDataset manages a set of data blocks. Each block
* has a unique name and an extent on disk.
Expand Down Expand Up @@ -287,6 +290,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
private long curDirScannerNotifyCount;
private long lastDirScannerNotifyTime;
private volatile long lastDirScannerFinishTime;
private volatile boolean deleteCorruptReplicaFromDisk;

/**
* An FSDataset has a directory where it loads its data files.
Expand Down Expand Up @@ -392,6 +396,9 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_KEY,
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_MAX_NOTIFY_COUNT_DEFAULT);
lastDirScannerNotifyTime = System.currentTimeMillis();
deleteCorruptReplicaFromDisk = conf.getBoolean(
DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_ENABLE,
DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_DEFAULT);
}

/**
Expand Down Expand Up @@ -2400,37 +2407,43 @@ public void invalidate(String bpid, ReplicaInfo block) {
}
/**
* Invalidate a block which is not found on disk. We should remove it from
* memory and notify namenode, but unnecessary to delete the actual on-disk
* block file again.
* memory and notify namenode, will decide whether to delete the actual on-disk block and meta
* file based on {@link DFSConfigKeys#DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_ENABLE}.
*
* @param bpid the block pool ID.
* @param block The block to be invalidated.
* @param checkFiles Whether to check data and meta files.
*/
public void invalidateMissingBlock(String bpid, Block block, boolean checkFiles) {
public void invalidateMissingBlock(String bpid, Block block, boolean checkFiles)
throws IOException {

// The replica seems is on its volume map but not on disk.
// We can't confirm here is block file lost or disk failed.
// If block lost:
// deleted local block file is completely unnecessary
// If disk failed:
// deleted local block file here may lead to missing-block
// when it with only 1 replication left now.
// So remove if from volume map notify namenode is ok.
// If checkFiles is true, the existence of the block and metafile will be checked again.
// If deleteCorruptReplicaFromDisk is true, delete the existing block or metafile directly,
// otherwise just remove them from the memory volumeMap.
try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.BLOCK_POOl,
bpid)) {
// Check if this block is on the volume map.
ReplicaInfo replica = volumeMap.get(bpid, block);
// Double-check block or meta file existence when checkFiles as true.
if (replica != null && (!checkFiles ||
(!replica.blockDataExists() || !replica.metadataExists()))) {
volumeMap.remove(bpid, block);
invalidate(bpid, replica);
if (deleteCorruptReplicaFromDisk) {
ExtendedBlock extendedBlock = new ExtendedBlock(bpid, block);
datanode
.notifyNamenodeDeletedBlock(extendedBlock, replica.getStorageUuid());
invalidate(bpid, new Block[] {extendedBlock.getLocalBlock()});
} else {
// For detailed info, please refer to HDFS-16985.
volumeMap.remove(bpid, block);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some comments to describe the necessity of the else logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modify patch based on comments.

Hi @ZanderXu please help review it again, thanks~

invalidate(bpid, replica);
}
}
}
}

public void invalidateMissingBlock(String bpid, Block block) {
public void invalidateMissingBlock(String bpid, Block block) throws IOException {
invalidateMissingBlock(bpid, block, true);
}

Expand Down Expand Up @@ -3845,5 +3858,15 @@ public void setLastDirScannerFinishTime(long time) {
public long getPendingAsyncDeletions() {
return asyncDiskService.countPendingDeletions();
}

@Override
public void setDeleteCorruptReplicaFromDisk(boolean deleteCorruptReplicaFromDisk) {
this.deleteCorruptReplicaFromDisk = deleteCorruptReplicaFromDisk;
}

@Override
public boolean isDeleteCorruptReplicaFromDisk() {
return deleteCorruptReplicaFromDisk;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -3982,6 +3982,17 @@
</description>
</property>

<property>
<name>dfs.datanode.delete.corrupt.replica.from.disk.enable</name>
<value>true</value>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the default value is true, there is a risk of block missing according to HDFS-16985. I suggest setting the default value to false, as block missing is a more serious problem than disk file deletion delay. What's your opion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zhangshuyan0 for your comment.

From the DataNode point of view, if already confirmed the meta file or data file is lost. it should be deleted directly from the memory and disk and this is expected behavior.

For HDFS-16985 mentioned, if the current cluster deployment adopts the AWS EC2 + EBS solution, can adjust dfs.datanode.delete.corrupt.replica.from.disk.enable is false as needed to avoid deleting disk data.

So I think it might be better from datanode perspective default to set dfs.datanode.delete.corrupt.replica.from.disk.enable to true

looking forward to your suggestions again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @zhangshuyan0 Would you mind look it again, thanks~

<description>
Whether the datanode delete replica from disk when client requests a missing block,
If true will delete the actual on-disk block and meta file,
otherwise will only remove it from volume map and notify namenode.
The default value is true.
</description>
</property>

<property>
<name>dfs.webhdfs.rest-csrf.enabled</name>
<value>false</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1647,5 +1647,15 @@ public List<FsVolumeImpl> getVolumeList() {
public void setLastDirScannerFinishTime(long time) {
throw new UnsupportedOperationException();
}

@Override
public void setDeleteCorruptReplicaFromDisk(boolean deleteCorruptReplicaFromDisk) {
throw new UnsupportedOperationException();
}

@Override
public boolean isDeleteCorruptReplicaFromDisk() {
throw new UnsupportedOperationException();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_ENABLE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_DEFAULT;
Expand Down Expand Up @@ -948,4 +950,32 @@ public void testSlowIoWarningThresholdReconfiguration() throws Exception {
}
}

@Test
public void testDeleteCorruptReplicaFromDiskParameter() throws Exception {
for (int i = 0; i < NUM_DATA_NODE; i++) {
DataNode dn = cluster.getDataNodes().get(i);

// Verify dfs.datanode.delete.corrupt.replica.from.disk.enable
// Try invalid values.
LambdaTestUtils.intercept(ReconfigurationException.class,
"Could not change property " +
"dfs.datanode.delete.corrupt.replica.from.disk.enable from 'true' to 'text'",
() -> dn.reconfigureProperty(DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_ENABLE,
"text"));

// Set default value.
dn.reconfigureProperty(DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_ENABLE, null);
assertEquals(dn.getConf().getBoolean(DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_ENABLE,
DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_DEFAULT),
dn.getFSDataset().isDeleteCorruptReplicaFromDisk());

// Set dfs.datanode.delete.corrupt.replica.from.disk.enable to false.
dn.reconfigureProperty(DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_ENABLE, "false");
assertFalse(dn.getFSDataset().isDeleteCorruptReplicaFromDisk());

// Set dfs.datanode.delete.corrupt.replica.from.disk.enable to true.
dn.reconfigureProperty(DFS_DATANODE_DELETE_CORRUPT_REPLICA_FROM_DISK_ENABLE, "true");
assertTrue(dn.getFSDataset().isDeleteCorruptReplicaFromDisk());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -487,4 +487,14 @@ public long getLastDirScannerFinishTime() {
public long getPendingAsyncDeletions() {
return 0;
}

@Override
public void setDeleteCorruptReplicaFromDisk(boolean deleteCorruptReplicaFromDisk) {

}

@Override
public boolean isDeleteCorruptReplicaFromDisk() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1956,10 +1956,13 @@ public void delayDeleteReplica() {
/**
* Test the block file which is not found when disk with some exception.
* We expect:
* If deleteCorruptReplicaFromDisk as false:
* 1. block file wouldn't be deleted from disk.
* 2. block info would be removed from dn memory.
* 3. block would be reported to nn as missing block.
* 4. block would be recovered when disk back to normal.
* If deleteCorruptReplicaFromDisk as true:
* 1. block file would be removed from dn memory and disk.
*/
@Test
public void testInvalidateMissingBlock() throws Exception {
Expand Down Expand Up @@ -1987,6 +1990,8 @@ public void testInvalidateMissingBlock() throws Exception {
File blockFile = new File(blockPath);
File metaFile = new File(metaPath);

// Set deleteCorruptReplicaFromDisk as false will only remove block from mem.
fsdataset.setDeleteCorruptReplicaFromDisk(false);
// Mock local block file not found when disk with some exception.
fsdataset.invalidateMissingBlock(bpid, replicaInfo, false);

Expand All @@ -2007,6 +2012,17 @@ public void testInvalidateMissingBlock() throws Exception {
fsdataset.checkAndUpdate(bpid, info);
GenericTestUtils.waitFor(() ->
blockManager.getLowRedundancyBlocksCount() == 0, 100, 5000);

assertTrue(blockFile.exists());
// Set deleteCorruptReplicaFromDisk as true will remove block from mem and disk.
fsdataset.setDeleteCorruptReplicaFromDisk(true);
// Mock local block file not found when disk with some exception.
fsdataset.invalidateMissingBlock(bpid, replicaInfo, false);

// Assert local block file would be removed from mem and disk.
GenericTestUtils.waitFor(() -> !blockFile.exists(), 100, 5000);
assertEquals("null",
fsdataset.getReplicaString(bpid, replicaInfo.getBlockId()));
} finally {
cluster.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException, Interr
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("datanode", address, outs, errs);
assertEquals(26, outs.size());
assertEquals(27, outs.size());
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
}

Expand Down