HDFS-7833. DataNode reconfiguration does not recalculate valid volumes required, based on configured failed volumes tolerated. Contributed by Lei (Eddy) Xu.

This commit is contained in:
cnauroth 2015-05-06 21:11:30 -07:00
parent 4c7b9b6abe
commit 6633a8474d
3 changed files with 105 additions and 10 deletions

View File

@ -638,6 +638,10 @@ Release 2.8.0 - UNRELEASED
HDFS-2484. checkLease should throw FileNotFoundException when file does HDFS-2484. checkLease should throw FileNotFoundException when file does
not exist. (Rakesh R via shv) not exist. (Rakesh R via shv)
HDFS-7833. DataNode reconfiguration does not recalculate valid volumes
required, based on configured failed volumes tolerated.
(Lei (Eddy) Xu via cnauroth)
Release 2.7.1 - UNRELEASED Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -238,7 +238,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final Daemon lazyWriter; final Daemon lazyWriter;
final FsDatasetCache cacheManager; final FsDatasetCache cacheManager;
private final Configuration conf; private final Configuration conf;
private final int validVolsRequired; private final int volFailuresTolerated;
private volatile boolean fsRunning; private volatile boolean fsRunning;
final ReplicaMap volumeMap; final ReplicaMap volumeMap;
@ -269,7 +269,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
this.smallBufferSize = DFSUtil.getSmallBufferSize(conf); this.smallBufferSize = DFSUtil.getSmallBufferSize(conf);
// The number of volumes required for operation is the total number // The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate. // of volumes minus the number of failed volumes we can tolerate.
final int volFailuresTolerated = volFailuresTolerated =
conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, conf.getInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT); DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
@ -280,7 +280,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length; int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
int volsFailed = volumeFailureInfos.size(); int volsFailed = volumeFailureInfos.size();
this.validVolsRequired = volsConfigured - volFailuresTolerated;
if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) { if (volFailuresTolerated < 0 || volFailuresTolerated >= volsConfigured) {
throw new DiskErrorException("Invalid value configured for " throw new DiskErrorException("Invalid value configured for "
@ -543,7 +542,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/ */
@Override // FsDatasetSpi @Override // FsDatasetSpi
public boolean hasEnoughResource() { public boolean hasEnoughResource() {
return volumes.getVolumes().size() >= validVolsRequired; return getNumFailedVolumes() <= volFailuresTolerated;
} }
/** /**

View File

@ -34,6 +34,7 @@ import java.util.Map;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -219,12 +220,7 @@ public class TestDataNodeVolumeFailure {
File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1)); File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1));
DataNodeTestUtils.injectDataDirFailure(dn0Vol1); DataNodeTestUtils.injectDataDirFailure(dn0Vol1);
DataNode dn0 = cluster.getDataNodes().get(0); DataNode dn0 = cluster.getDataNodes().get(0);
long lastDiskErrorCheck = dn0.getLastDiskErrorCheck(); checkDiskErrorSync(dn0);
dn0.checkDiskErrorAsync();
// Wait checkDiskError thread finish to discover volume failure.
while (dn0.getLastDiskErrorCheck() == lastDiskErrorCheck) {
Thread.sleep(100);
}
// Verify dn0Vol1 has been completely removed from DN0. // Verify dn0Vol1 has been completely removed from DN0.
// 1. dn0Vol1 is removed from DataStorage. // 1. dn0Vol1 is removed from DataStorage.
@ -270,6 +266,102 @@ public class TestDataNodeVolumeFailure {
assertFalse(dataDirStrs[0].contains(dn0Vol1.getAbsolutePath())); assertFalse(dataDirStrs[0].contains(dn0Vol1.getAbsolutePath()));
} }
private static void checkDiskErrorSync(DataNode dn)
throws InterruptedException {
final long lastDiskErrorCheck = dn.getLastDiskErrorCheck();
dn.checkDiskErrorAsync();
// Wait 10 seconds for checkDiskError thread to finish and discover volume
// failures.
int count = 100;
while (count > 0 && dn.getLastDiskErrorCheck() == lastDiskErrorCheck) {
Thread.sleep(100);
count--;
}
assertTrue("Disk checking thread does not finish in 10 seconds",
count > 0);
}
/**
* Test DataNode stops when the number of failed volumes exceeds
* dfs.datanode.failed.volumes.tolerated .
*/
@Test(timeout=10000)
public void testDataNodeShutdownAfterNumFailedVolumeExceedsTolerated()
throws InterruptedException, IOException {
// make both data directories to fail on dn0
final File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1));
final File dn0Vol2 = new File(dataDir, "data" + (2 * 0 + 2));
DataNodeTestUtils.injectDataDirFailure(dn0Vol1, dn0Vol2);
DataNode dn0 = cluster.getDataNodes().get(0);
checkDiskErrorSync(dn0);
// DN0 should stop after the number of failure disks exceed tolerated
// value (1).
assertFalse(dn0.shouldRun());
}
/**
* Test that DN does not shutdown, as long as failure volumes being hot swapped.
*/
@Test
public void testVolumeFailureRecoveredByHotSwappingVolume()
throws InterruptedException, ReconfigurationException, IOException {
final File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1));
final File dn0Vol2 = new File(dataDir, "data" + (2 * 0 + 2));
final DataNode dn0 = cluster.getDataNodes().get(0);
final String oldDataDirs = dn0.getConf().get(
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
// Fail dn0Vol1 first.
DataNodeTestUtils.injectDataDirFailure(dn0Vol1);
checkDiskErrorSync(dn0);
// Hot swap out the failure volume.
String dataDirs = dn0Vol2.getPath();
dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
dataDirs);
// Fix failure volume dn0Vol1 and remount it back.
DataNodeTestUtils.restoreDataDirFromFailure(dn0Vol1);
dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
oldDataDirs);
// Fail dn0Vol2. Now since dn0Vol1 has been fixed, DN0 has sufficient
// resources, thus it should keep running.
DataNodeTestUtils.injectDataDirFailure(dn0Vol2);
checkDiskErrorSync(dn0);
assertTrue(dn0.shouldRun());
}
/**
* Test changing the number of volumes does not impact the disk failure
* tolerance.
*/
@Test
public void testTolerateVolumeFailuresAfterAddingMoreVolumes()
throws InterruptedException, ReconfigurationException, IOException {
final File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1));
final File dn0Vol2 = new File(dataDir, "data" + (2 * 0 + 2));
final File dn0VolNew = new File(dataDir, "data_new");
final DataNode dn0 = cluster.getDataNodes().get(0);
final String oldDataDirs = dn0.getConf().get(
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
// Add a new volume to DN0
dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
oldDataDirs + "," + dn0VolNew.getAbsolutePath());
// Fail dn0Vol1 first and hot swap it.
DataNodeTestUtils.injectDataDirFailure(dn0Vol1);
checkDiskErrorSync(dn0);
assertTrue(dn0.shouldRun());
// Fail dn0Vol2, now dn0 should stop, because we only tolerate 1 disk failure.
DataNodeTestUtils.injectDataDirFailure(dn0Vol2);
checkDiskErrorSync(dn0);
assertFalse(dn0.shouldRun());
}
/** /**
* Test that there are under replication blocks after vol failures * Test that there are under replication blocks after vol failures
*/ */