From ee1d3105c216b22e10a46233b9dacc0b80a87aaf Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Fri, 10 Mar 2017 10:44:04 -0800 Subject: [PATCH] HDFS-11340. DataNode reconfigure for disks doesn't remove the failed volumes. (Manoj Govindassamy via lei) --- .../hadoop/hdfs/server/datanode/DataNode.java | 74 ++++++++--- .../fsdataset/impl/FsDatasetImpl.java | 19 ++- .../datanode/fsdataset/impl/FsVolumeList.java | 13 +- .../server/datanode/DataNodeTestUtils.java | 25 ++++ .../TestDataNodeVolumeFailureReporting.java | 125 ++++++++++++++++-- 5 files changed, 222 insertions(+), 34 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index c7839565f4d..31f25af7717 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -650,49 +650,86 @@ public class DataNode extends ReconfigurableBase ChangedVolumes parseChangedVolumes(String newVolumes) throws IOException { Configuration conf = new Configuration(); conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes); - List locations = getStorageLocations(conf); + List newStorageLocations = getStorageLocations(conf); - if (locations.isEmpty()) { + if (newStorageLocations.isEmpty()) { throw new IOException("No directory is specified."); } - // Use the existing StorageLocation to detect storage type changes. - Map existingLocations = new HashMap<>(); + // Use the existing storage locations from the current conf + // to detect new storage additions or removals. + Map existingStorageLocations = new HashMap<>(); for (StorageLocation loc : getStorageLocations(getConf())) { - existingLocations.put(loc.getFile().getCanonicalPath(), loc); + existingStorageLocations.put(loc.getFile().getCanonicalPath(), loc); } ChangedVolumes results = new ChangedVolumes(); - results.newLocations.addAll(locations); + results.newLocations.addAll(newStorageLocations); for (Iterator it = storage.dirIterator(); - it.hasNext(); ) { + it.hasNext();) { Storage.StorageDirectory dir = it.next(); boolean found = false; - for (Iterator sl = results.newLocations.iterator(); - sl.hasNext(); ) { - StorageLocation location = sl.next(); - if (location.getFile().getCanonicalPath().equals( + for (Iterator newLocationItr = + results.newLocations.iterator(); + newLocationItr.hasNext();) { + StorageLocation newLocation = newLocationItr.next(); + if (newLocation.getFile().getCanonicalPath().equals( dir.getRoot().getCanonicalPath())) { - sl.remove(); - StorageLocation old = existingLocations.get( - location.getFile().getCanonicalPath()); + StorageLocation old = existingStorageLocations.get( + newLocation.getFile().getCanonicalPath()); if (old != null && - old.getStorageType() != location.getStorageType()) { + old.getStorageType() != newLocation.getStorageType()) { throw new IOException("Changing storage type is not allowed."); } - results.unchangedLocations.add(location); + // Update the unchanged locations as this location + // from the new conf is really not a new one. + newLocationItr.remove(); + results.unchangedLocations.add(newLocation); found = true; break; } } + // New conf doesn't have the storage location which available in + // the current storage locations. Add to the deactivateLocations list. if (!found) { + LOG.info("Deactivation request received for active volume: " + + dir.getRoot().toString()); results.deactivateLocations.add( StorageLocation.parse(dir.getRoot().toString())); } } + // Use the failed storage locations from the current conf + // to detect removals in the new conf. + if (getFSDataset().getNumFailedVolumes() > 0) { + for (String failedStorageLocation : getFSDataset() + .getVolumeFailureSummary().getFailedStorageLocations()) { + boolean found = false; + for (Iterator newLocationItr = + results.newLocations.iterator(); newLocationItr.hasNext();) { + StorageLocation newLocation = newLocationItr.next(); + if (newLocation.getFile().getCanonicalPath().equals( + failedStorageLocation)) { + // The failed storage is being re-added. DataNode#refreshVolumes() + // will take care of re-assessing it. + found = true; + break; + } + } + + // New conf doesn't have this failed storage location. + // Add to the deactivate locations list. + if (!found) { + LOG.info("Deactivation request received for failed volume: " + + failedStorageLocation); + results.deactivateLocations.add(StorageLocation.parse( + failedStorageLocation)); + } + } + } + return results; } @@ -715,8 +752,9 @@ public class DataNode extends ReconfigurableBase } try { - if (numOldDataDirs + changedVolumes.newLocations.size() - - changedVolumes.deactivateLocations.size() <= 0) { + if (numOldDataDirs + getFSDataset().getNumFailedVolumes() + + changedVolumes.newLocations.size() + - changedVolumes.deactivateLocations.size() <= 0) { throw new IOException("Attempt to remove all volumes."); } if (!changedVolumes.newLocations.isEmpty()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index ca1177629d4..dcfe3500b80 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -523,9 +523,12 @@ class FsDatasetImpl implements FsDatasetSpi { * @param clearFailure set true to clear failure information. */ @Override - public void removeVolumes(Set volumesToRemove, boolean clearFailure) { + public void removeVolumes(Set storageLocsToRemove, + boolean clearFailure) { + Collection storageLocationsToRemove = + new ArrayList<>(storageLocsToRemove); // Make sure that all volumes are absolute path. - for (File vol : volumesToRemove) { + for (File vol : storageLocationsToRemove) { Preconditions.checkArgument(vol.isAbsolute(), String.format("%s is not absolute path.", vol.getPath())); } @@ -536,7 +539,7 @@ class FsDatasetImpl implements FsDatasetSpi { for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); final File absRoot = sd.getRoot().getAbsoluteFile(); - if (volumesToRemove.contains(absRoot)) { + if (storageLocationsToRemove.contains(absRoot)) { LOG.info("Removing " + absRoot + " from FsDataset."); // Disable the volume from the service. @@ -563,6 +566,16 @@ class FsDatasetImpl implements FsDatasetSpi { } storageToRemove.add(sd.getStorageUuid()); + storageLocationsToRemove.remove(absRoot); + } + } + + // A reconfigure can remove the storage location which is already + // removed when the failure was detected by DataNode#checkDiskErrorAsync. + // Now, lets remove this from the failed volume list. + if (clearFailure) { + for (File storageLocToRemove : storageLocationsToRemove) { + volumes.removeVolumeFailureInfo(storageLocToRemove); } } setupAsyncLazyPersistThreads(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index 8f84bf28a0a..29bc4c9f540 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -369,8 +369,15 @@ class FsVolumeList { } void addVolumeFailureInfo(VolumeFailureInfo volumeFailureInfo) { - volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(), - volumeFailureInfo); + // There could be redundant requests for adding the same failed + // volume because of repeated DataNode reconfigure with same list + // of volumes. Ignoring update on failed volume so as to preserve + // old failed capacity details in the map. + if (!volumeFailureInfos.containsKey(volumeFailureInfo + .getFailedStorageLocation())) { + volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(), + volumeFailureInfo); + } } private void addVolumeFailureInfo(FsVolumeImpl vol) { @@ -380,7 +387,7 @@ class FsVolumeList { vol.getCapacity())); } - private void removeVolumeFailureInfo(File vol) { + void removeVolumeFailureInfo(File vol) { volumeFailureInfos.remove(vol.getAbsolutePath()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java index d35bf05620c..b99e90364f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; import java.io.IOException; +import com.google.common.base.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -35,6 +36,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; +import org.apache.hadoop.test.GenericTestUtils; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; @@ -228,4 +230,27 @@ public class DataNodeTestUtils { } return null; } + + /** + * Call and wait DataNode to detect disk failure. + * + * @param dn + * @param volume + * @throws Exception + */ + public static void waitForDiskError(final DataNode dn, FsVolumeSpi volume) + throws Exception { + LOG.info("Starting to wait for datanode to detect disk failure."); + final long lastDiskErrorCheck = dn.getLastDiskErrorCheck(); + dn.checkDiskErrorAsync(volume); + // Wait 10 seconds for checkDiskError thread to finish and discover volume + // failures. + GenericTestUtils.waitFor(new Supplier() { + + @Override + public Boolean get() { + return dn.getLastDiskErrorCheck() != lastDiskErrorCheck; + } + }, 100, 10000); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java index 5191083c043..09c6b0b399f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java @@ -25,12 +25,18 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeTrue; import java.io.File; +import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.concurrent.TimeUnit; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import com.google.common.base.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ReconfigurationException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -47,6 +53,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -396,8 +403,8 @@ public class TestDataNodeVolumeFailureReporting { DataNodeTestUtils.triggerHeartbeat(dns.get(0)); DataNodeTestUtils.triggerHeartbeat(dns.get(1)); - checkFailuresAtDataNode(dns.get(0), 1, false, dn1Vol1.getAbsolutePath()); - checkFailuresAtDataNode(dns.get(1), 1, false, dn2Vol1.getAbsolutePath()); + checkFailuresAtDataNode(dns.get(0), 1, true, dn1Vol1.getAbsolutePath()); + checkFailuresAtDataNode(dns.get(1), 1, true, dn2Vol1.getAbsolutePath()); // Ensure we wait a sufficient amount of time. assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH; @@ -405,9 +412,9 @@ public class TestDataNodeVolumeFailureReporting { // The NN reports two volume failures again. DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2, origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS); - checkAggregateFailuresAtNameNode(false, 2); - checkFailuresAtNameNode(dm, dns.get(0), false, dn1Vol1.getAbsolutePath()); - checkFailuresAtNameNode(dm, dns.get(1), false, dn2Vol1.getAbsolutePath()); + checkAggregateFailuresAtNameNode(true, 2); + checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath()); + checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath()); // Reconfigure a third time with the failed volumes. Afterwards, we expect // the same volume failures to be reported. (No double-counting.) @@ -417,8 +424,8 @@ public class TestDataNodeVolumeFailureReporting { DataNodeTestUtils.triggerHeartbeat(dns.get(0)); DataNodeTestUtils.triggerHeartbeat(dns.get(1)); - checkFailuresAtDataNode(dns.get(0), 1, false, dn1Vol1.getAbsolutePath()); - checkFailuresAtDataNode(dns.get(1), 1, false, dn2Vol1.getAbsolutePath()); + checkFailuresAtDataNode(dns.get(0), 1, true, dn1Vol1.getAbsolutePath()); + checkFailuresAtDataNode(dns.get(1), 1, true, dn2Vol1.getAbsolutePath()); // Ensure we wait a sufficient amount of time. assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH; @@ -426,9 +433,9 @@ public class TestDataNodeVolumeFailureReporting { // The NN reports two volume failures again. DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2, origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS); - checkAggregateFailuresAtNameNode(false, 2); - checkFailuresAtNameNode(dm, dns.get(0), false, dn1Vol1.getAbsolutePath()); - checkFailuresAtNameNode(dm, dns.get(1), false, dn2Vol1.getAbsolutePath()); + checkAggregateFailuresAtNameNode(true, 2); + checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath()); + checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath()); // Replace failed volume with healthy volume and run reconfigure DataNode. // The failed volume information should be cleared. @@ -503,6 +510,104 @@ public class TestDataNodeVolumeFailureReporting { currentVersion.exists()); } + /** + * Verify DataNode NumFailedVolumes and FailedStorageLocations + * after hot swap out of failed volume. + */ + @Test (timeout = 120000) + public void testHotSwapOutFailedVolumeAndReporting() + throws Exception { + LOG.info("Starting testHotSwapOutFailedVolumeAndReporting!"); + final File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1)); + final File dn0Vol2 = new File(dataDir, "data" + (2 * 0 + 2)); + final DataNode dn0 = cluster.getDataNodes().get(0); + final String oldDataDirs = dn0.getConf().get( + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); + + final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + final ObjectName mxbeanName = new ObjectName( + "Hadoop:service=DataNode,name=FSDatasetState-" + dn0.getDatanodeUuid()); + int numFailedVolumes = (int) mbs.getAttribute(mxbeanName, + "NumFailedVolumes"); + Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(), + numFailedVolumes); + checkFailuresAtDataNode(dn0, 0, false, new String[] {}); + + // Fail dn0Vol1 first. + // Verify NumFailedVolumes and FailedStorageLocations are empty. + DataNodeTestUtils.injectDataDirFailure(dn0Vol1); + DataNodeTestUtils.waitForDiskError(dn0, + DataNodeTestUtils.getVolume(dn0, dn0Vol1)); + numFailedVolumes = (int) mbs.getAttribute(mxbeanName, "NumFailedVolumes"); + Assert.assertEquals(1, numFailedVolumes); + Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(), + numFailedVolumes); + checkFailuresAtDataNode(dn0, 1, true, + new String[] {dn0Vol1.getAbsolutePath()}); + + // Reconfigure disks without fixing the failed disk. + // Verify NumFailedVolumes and FailedStorageLocations haven't changed. + try { + dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, + oldDataDirs); + } catch (ReconfigurationException e) { + Assert.assertTrue("Reconfigure exception doesn't have expected path!", + e.getCause().getMessage().contains(dn0Vol1.getAbsolutePath())); + } + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + return ((int) mbs.getAttribute(mxbeanName, + "NumFailedVolumes") == 1); + } catch (Exception e) { + return false; + } + } + }, 1000, 30000); + Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(), + numFailedVolumes); + checkFailuresAtDataNode(dn0, 1, true, + new String[] {dn0Vol1.getAbsolutePath()}); + + // Hot swap out the failed volume. + // Verify NumFailedVolumes and FailedStorageLocations are reset. + String dataDirs = dn0Vol2.getPath(); + dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, + dataDirs); + numFailedVolumes = (int) mbs.getAttribute(mxbeanName, "NumFailedVolumes"); + Assert.assertEquals(0, numFailedVolumes); + Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(), + numFailedVolumes); + checkFailuresAtDataNode(dn0, 0, true, new String[] {}); + + // Fix failure volume dn0Vol1 and remount it back. + // Verify NumFailedVolumes and FailedStorageLocations are empty. + DataNodeTestUtils.restoreDataDirFromFailure(dn0Vol1); + dn0.reconfigurePropertyImpl(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, + oldDataDirs); + numFailedVolumes = (int) mbs.getAttribute(mxbeanName, "NumFailedVolumes"); + Assert.assertEquals(0, numFailedVolumes); + Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(), + numFailedVolumes); + checkFailuresAtDataNode(dn0, 0, true, new String[] {}); + + // Fail dn0Vol2. + // Verify NumFailedVolumes and FailedStorageLocations are updated. + DataNodeTestUtils.injectDataDirFailure(dn0Vol2); + DataNodeTestUtils.waitForDiskError(dn0, + DataNodeTestUtils.getVolume(dn0, dn0Vol2)); + numFailedVolumes = (int) mbs.getAttribute(mxbeanName, "NumFailedVolumes"); + Assert.assertEquals(1, numFailedVolumes); + Assert.assertEquals(dn0.getFSDataset().getNumFailedVolumes(), + numFailedVolumes); + checkFailuresAtDataNode(dn0, 1, true, + new String[] {dn0Vol2.getAbsolutePath()}); + + // Verify DataNode tolerating one disk failure. + assertTrue(dn0.shouldRun()); + } + /** * Checks the NameNode for correct values of aggregate counters tracking failed * volumes across all DataNodes.