HDFS-11340. DataNode reconfigure for disks doesn't remove the failed volumes. (Manoj Govindassamy via lei)

This commit is contained in:
Lei Xu 2017-03-10 14:36:51 -08:00
parent 9649c27864
commit 6d356b6b4d
4 changed files with 184 additions and 32 deletions

View File

@ -652,48 +652,84 @@ public class DataNode extends ReconfigurableBase
ChangedVolumes parseChangedVolumes(String newVolumes) throws IOException { ChangedVolumes parseChangedVolumes(String newVolumes) throws IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes); conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
List<StorageLocation> locations = getStorageLocations(conf); List<StorageLocation> newStorageLocations = getStorageLocations(conf);
if (locations.isEmpty()) { if (newStorageLocations.isEmpty()) {
throw new IOException("No directory is specified."); throw new IOException("No directory is specified.");
} }
// Use the existing StorageLocation to detect storage type changes. // Use the existing storage locations from the current conf
Map<String, StorageLocation> existingLocations = new HashMap<>(); // to detect new storage additions or removals.
Map<String, StorageLocation> existingStorageLocations = new HashMap<>();
for (StorageLocation loc : getStorageLocations(getConf())) { for (StorageLocation loc : getStorageLocations(getConf())) {
existingLocations.put(loc.getNormalizedUri().toString(), loc); existingStorageLocations.put(loc.getNormalizedUri().toString(), loc);
} }
ChangedVolumes results = new ChangedVolumes(); ChangedVolumes results = new ChangedVolumes();
results.newLocations.addAll(locations); results.newLocations.addAll(newStorageLocations);
for (Iterator<Storage.StorageDirectory> it = storage.dirIterator(); for (Iterator<Storage.StorageDirectory> it = storage.dirIterator();
it.hasNext(); ) { it.hasNext(); ) {
Storage.StorageDirectory dir = it.next(); Storage.StorageDirectory dir = it.next();
boolean found = false; boolean found = false;
for (Iterator<StorageLocation> sl = results.newLocations.iterator(); for (Iterator<StorageLocation> newLocationItr =
sl.hasNext(); ) { results.newLocations.iterator(); newLocationItr.hasNext();) {
StorageLocation location = sl.next(); StorageLocation newLocation = newLocationItr.next();
if (location.matchesStorageDirectory(dir)) { if (newLocation.matchesStorageDirectory(dir)) {
sl.remove(); StorageLocation oldLocation = existingStorageLocations.get(
StorageLocation old = existingLocations.get( newLocation.getNormalizedUri().toString());
location.getNormalizedUri().toString()); if (oldLocation != null &&
if (old != null && oldLocation.getStorageType() != newLocation.getStorageType()) {
old.getStorageType() != location.getStorageType()) {
throw new IOException("Changing storage type is not allowed."); throw new IOException("Changing storage type is not allowed.");
} }
results.unchangedLocations.add(location); // Update the unchanged locations as this location
// from the new conf is really not a new one.
newLocationItr.remove();
results.unchangedLocations.add(newLocation);
found = true; found = true;
break; break;
} }
} }
// New conf doesn't have the storage location which available in
// the current storage locations. Add to the deactivateLocations list.
if (!found) { if (!found) {
LOG.info("Deactivation request received for active volume: "
+ dir.getRoot().toString());
results.deactivateLocations.add( results.deactivateLocations.add(
StorageLocation.parse(dir.getRoot().toString())); StorageLocation.parse(dir.getRoot().toString()));
} }
} }
// Use the failed storage locations from the current conf
// to detect removals in the new conf.
if (getFSDataset().getNumFailedVolumes() > 0) {
for (String failedStorageLocation : getFSDataset()
.getVolumeFailureSummary().getFailedStorageLocations()) {
boolean found = false;
for (Iterator<StorageLocation> newLocationItr =
results.newLocations.iterator(); newLocationItr.hasNext();) {
StorageLocation newLocation = newLocationItr.next();
if (newLocation.getNormalizedUri().toString().equals(
failedStorageLocation)) {
// The failed storage is being re-added. DataNode#refreshVolumes()
// will take care of re-assessing it.
found = true;
break;
}
}
// New conf doesn't have this failed storage location.
// Add to the deactivate locations list.
if (!found) {
LOG.info("Deactivation request received for failed volume: "
+ failedStorageLocation);
results.deactivateLocations.add(StorageLocation.parse(
failedStorageLocation));
}
}
}
return results; return results;
} }
@ -716,8 +752,9 @@ public class DataNode extends ReconfigurableBase
} }
try { try {
if (numOldDataDirs + changedVolumes.newLocations.size() - if (numOldDataDirs + getFSDataset().getNumFailedVolumes()
changedVolumes.deactivateLocations.size() <= 0) { + changedVolumes.newLocations.size()
- changedVolumes.deactivateLocations.size() <= 0) {
throw new IOException("Attempt to remove all volumes."); throw new IOException("Attempt to remove all volumes.");
} }
if (!changedVolumes.newLocations.isEmpty()) { if (!changedVolumes.newLocations.isEmpty()) {

View File

@ -502,8 +502,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/ */
@Override @Override
public void removeVolumes( public void removeVolumes(
Collection<StorageLocation> storageLocationsToRemove, final Collection<StorageLocation> storageLocsToRemove,
boolean clearFailure) { boolean clearFailure) {
Collection<StorageLocation> storageLocationsToRemove =
new ArrayList<>(storageLocsToRemove);
Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>(); Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
List<String> storageToRemove = new ArrayList<>(); List<String> storageToRemove = new ArrayList<>();
try (AutoCloseableLock lock = datasetLock.acquire()) { try (AutoCloseableLock lock = datasetLock.acquire()) {
@ -541,6 +543,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
storageToRemove.add(sd.getStorageUuid()); storageToRemove.add(sd.getStorageUuid());
storageLocationsToRemove.remove(sdLocation);
}
}
// A reconfigure can remove the storage location which is already
// removed when the failure was detected by DataNode#checkDiskErrorAsync.
// Now, lets remove this from the failed volume list.
if (clearFailure) {
for (StorageLocation storageLocToRemove : storageLocationsToRemove) {
volumes.removeVolumeFailureInfo(storageLocToRemove);
} }
} }
setupAsyncLazyPersistThreads(); setupAsyncLazyPersistThreads();

View File

@ -371,9 +371,16 @@ class FsVolumeList {
} }
void addVolumeFailureInfo(VolumeFailureInfo volumeFailureInfo) { void addVolumeFailureInfo(VolumeFailureInfo volumeFailureInfo) {
// There could be redundant requests for adding the same failed
// volume because of repeated DataNode reconfigure with same list
// of volumes. Ignoring update on failed volume so as to preserve
// old failed capacity details in the map.
if (!volumeFailureInfos.containsKey(volumeFailureInfo
.getFailedStorageLocation())) {
volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(), volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
volumeFailureInfo); volumeFailureInfo);
} }
}
private void addVolumeFailureInfo(FsVolumeImpl vol) { private void addVolumeFailureInfo(FsVolumeImpl vol) {
addVolumeFailureInfo(new VolumeFailureInfo( addVolumeFailureInfo(new VolumeFailureInfo(
@ -382,7 +389,7 @@ class FsVolumeList {
vol.getCapacity())); vol.getCapacity()));
} }
private void removeVolumeFailureInfo(StorageLocation location) { void removeVolumeFailureInfo(StorageLocation location) {
volumeFailureInfos.remove(location); volumeFailureInfos.remove(location);
} }

View File

@ -25,16 +25,22 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File; import java.io.File;
import java.lang.management.ManagementFactory;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -51,6 +57,7 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -406,8 +413,8 @@ public class TestDataNodeVolumeFailureReporting {
DataNodeTestUtils.triggerHeartbeat(dns.get(0)); DataNodeTestUtils.triggerHeartbeat(dns.get(0));
DataNodeTestUtils.triggerHeartbeat(dns.get(1)); DataNodeTestUtils.triggerHeartbeat(dns.get(1));
checkFailuresAtDataNode(dns.get(0), 1, false, dn1Vol1.getAbsolutePath()); checkFailuresAtDataNode(dns.get(0), 1, true, dn1Vol1.getAbsolutePath());
checkFailuresAtDataNode(dns.get(1), 1, false, dn2Vol1.getAbsolutePath()); checkFailuresAtDataNode(dns.get(1), 1, true, dn2Vol1.getAbsolutePath());
// Ensure we wait a sufficient amount of time. // Ensure we wait a sufficient amount of time.
assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH; assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
@ -415,9 +422,9 @@ public class TestDataNodeVolumeFailureReporting {
// The NN reports two volume failures again. // The NN reports two volume failures again.
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2, DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS); origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(false, 2); checkAggregateFailuresAtNameNode(true, 2);
checkFailuresAtNameNode(dm, dns.get(0), false, dn1Vol1.getAbsolutePath()); checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
checkFailuresAtNameNode(dm, dns.get(1), false, dn2Vol1.getAbsolutePath()); checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
// Reconfigure a third time with the failed volumes. Afterwards, we expect // Reconfigure a third time with the failed volumes. Afterwards, we expect
// the same volume failures to be reported. (No double-counting.) // the same volume failures to be reported. (No double-counting.)
@ -427,8 +434,8 @@ public class TestDataNodeVolumeFailureReporting {
DataNodeTestUtils.triggerHeartbeat(dns.get(0)); DataNodeTestUtils.triggerHeartbeat(dns.get(0));
DataNodeTestUtils.triggerHeartbeat(dns.get(1)); DataNodeTestUtils.triggerHeartbeat(dns.get(1));
checkFailuresAtDataNode(dns.get(0), 1, false, dn1Vol1.getAbsolutePath()); checkFailuresAtDataNode(dns.get(0), 1, true, dn1Vol1.getAbsolutePath());
checkFailuresAtDataNode(dns.get(1), 1, false, dn2Vol1.getAbsolutePath()); checkFailuresAtDataNode(dns.get(1), 1, true, dn2Vol1.getAbsolutePath());
// Ensure we wait a sufficient amount of time. // Ensure we wait a sufficient amount of time.
assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH; assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
@ -436,9 +443,9 @@ public class TestDataNodeVolumeFailureReporting {
// The NN reports two volume failures again. // The NN reports two volume failures again.
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2, DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS); origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(false, 2); checkAggregateFailuresAtNameNode(true, 2);
checkFailuresAtNameNode(dm, dns.get(0), false, dn1Vol1.getAbsolutePath()); checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
checkFailuresAtNameNode(dm, dns.get(1), false, dn2Vol1.getAbsolutePath()); checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
// Replace failed volume with healthy volume and run reconfigure DataNode. // Replace failed volume with healthy volume and run reconfigure DataNode.
// The failed volume information should be cleared. // The failed volume information should be cleared.
@ -514,6 +521,95 @@ public class TestDataNodeVolumeFailureReporting {
currentVersion.exists()); currentVersion.exists());
} }
/**
* Verify DataNode NumFailedVolumes and FailedStorageLocations
* after hot swap out of failed volume.
*/
@Test
public void testHotSwapOutFailedVolumeAndReporting()
throws Exception {
final File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1));
final File dn0Vol2 = new File(dataDir, "data" + (2 * 0 + 2));
final DataNode dn0 = cluster.getDataNodes().get(0);
final String oldDataDirs = dn0.getConf().get(
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxbeanName = new ObjectName(
"Hadoop:service=DataNode,name=FSDatasetState-" + dn0.getDatanodeUuid());
int numFailedVolumes = (int) mbs.getAttribute(mxbeanName,
"NumFailedVolumes");
Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(),
numFailedVolumes);
checkFailuresAtDataNode(dn0, 0, false, new String[] {});
// Fail dn0Vol1 first.
// Verify NumFailedVolumes and FailedStorageLocations are empty.
DataNodeTestUtils.injectDataDirFailure(dn0Vol1);
DataNodeTestUtils.waitForDiskError(dn0,
DataNodeTestUtils.getVolume(dn0, dn0Vol1));
numFailedVolumes = (int) mbs.getAttribute(mxbeanName, "NumFailedVolumes");
Assert.assertEquals(1, numFailedVolumes);
Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(),
numFailedVolumes);
checkFailuresAtDataNode(dn0, 1, true,
new String[] {dn0Vol1.getAbsolutePath()});
// Reconfigure disks without fixing the failed disk.
// Verify NumFailedVolumes and FailedStorageLocations haven't changed.
try {
dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
oldDataDirs);
fail("Reconfigure with failed disk should throw exception.");
} catch (ReconfigurationException e) {
Assert.assertTrue("Reconfigure exception doesn't have expected path!",
e.getCause().getMessage().contains(dn0Vol1.getAbsolutePath()));
}
numFailedVolumes = (int) mbs.getAttribute(mxbeanName, "NumFailedVolumes");
Assert.assertEquals(1, numFailedVolumes);
Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(),
numFailedVolumes);
checkFailuresAtDataNode(dn0, 1, true,
new String[] {dn0Vol1.getAbsolutePath()});
// Hot swap out the failed volume.
// Verify NumFailedVolumes and FailedStorageLocations are reset.
String dataDirs = dn0Vol2.getPath();
dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
dataDirs);
numFailedVolumes = (int) mbs.getAttribute(mxbeanName, "NumFailedVolumes");
Assert.assertEquals(0, numFailedVolumes);
Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(),
numFailedVolumes);
checkFailuresAtDataNode(dn0, 0, true, new String[] {});
// Fix failure volume dn0Vol1 and remount it back.
// Verify NumFailedVolumes and FailedStorageLocations are empty.
DataNodeTestUtils.restoreDataDirFromFailure(dn0Vol1);
dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
oldDataDirs);
numFailedVolumes = (int) mbs.getAttribute(mxbeanName, "NumFailedVolumes");
Assert.assertEquals(0, numFailedVolumes);
Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(),
numFailedVolumes);
checkFailuresAtDataNode(dn0, 0, true, new String[] {});
// Fail dn0Vol2.
// Verify NumFailedVolumes and FailedStorageLocations are updated.
DataNodeTestUtils.injectDataDirFailure(dn0Vol2);
DataNodeTestUtils.waitForDiskError(dn0,
DataNodeTestUtils.getVolume(dn0, dn0Vol2));
numFailedVolumes = (int) mbs.getAttribute(mxbeanName, "NumFailedVolumes");
Assert.assertEquals(1, numFailedVolumes);
Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(),
numFailedVolumes);
checkFailuresAtDataNode(dn0, 1, true,
new String[] {dn0Vol2.getAbsolutePath()});
// Verify DataNode tolerating one disk failure.
assertTrue(dn0.shouldRun());
}
/** /**
* Checks the NameNode for correct values of aggregate counters tracking failed * Checks the NameNode for correct values of aggregate counters tracking failed
* volumes across all DataNodes. * volumes across all DataNodes.