diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index e4800444873..3b8006a50ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -166,6 +166,7 @@ import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResour import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.AddBlockPoolException; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeDiskMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics; @@ -1681,13 +1682,37 @@ public class DataNode extends ReconfigurableBase // Exclude failed disks before initializing the block pools to avoid startup // failures. checkDiskError(); - - data.addBlockPool(nsInfo.getBlockPoolID(), getConf()); + try { + data.addBlockPool(nsInfo.getBlockPoolID(), getConf()); + } catch (AddBlockPoolException e) { + handleAddBlockPoolError(e); + } blockScanner.enableBlockPoolId(bpos.getBlockPoolId()); initDirectoryScanner(getConf()); initDiskBalancer(data, getConf()); } + /** + * Handles an AddBlockPoolException object thrown from + * {@link org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeList# + * addBlockPool}. Will ensure that all volumes that encounted a + * AddBlockPoolException are removed from the DataNode and marked as failed + * volumes in the same way as a runtime volume failure. + * + * @param e this exception is a container for all IOException objects caught + * in FsVolumeList#addBlockPool. + */ + private void handleAddBlockPoolError(AddBlockPoolException e) + throws IOException { + Map unhealthyDataDirs = + e.getFailingVolumes(); + if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) { + handleVolumeFailures(unhealthyDataDirs.keySet()); + } else { + LOG.debug("HandleAddBlockPoolError called with empty exception list"); + } + } + List getAllBpOs() { return blockPoolManager.getAllNamenodeThreads(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/AddBlockPoolException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/AddBlockPoolException.java new file mode 100644 index 00000000000..ef63f006f44 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/AddBlockPoolException.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import java.io.IOException; +import java.util.Map; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; + +/** + * This exception collects all IOExceptions thrown when adding block pools and + * scanning volumes. It keeps the information about which volume is associated + * with an exception. + * + */ +public class AddBlockPoolException extends RuntimeException { + private Map unhealthyDataDirs; + public AddBlockPoolException(Map + unhealthyDataDirs) { + this.unhealthyDataDirs = unhealthyDataDirs; + } + + public Map getFailingVolumes() { + return unhealthyDataDirs; + } + @Override + public String toString() { + return getClass().getName() + ": " + unhealthyDataDirs.toString(); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index a0fcb54c695..85b85cf7c64 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -22,6 +22,7 @@ import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.Iterator; import java.util.List; @@ -188,8 +189,8 @@ class FsVolumeList { final RamDiskReplicaTracker ramDiskReplicaMap) throws IOException { long totalStartTime = Time.monotonicNow(); - final List exceptions = Collections.synchronizedList( - new ArrayList()); + final Map unhealthyDataDirs = + new ConcurrentHashMap(); List replicaAddingThreads = new ArrayList(); for (final FsVolumeImpl v : volumes) { Thread t = new Thread() { @@ -208,7 +209,7 @@ class FsVolumeList { } catch (IOException ioe) { FsDatasetImpl.LOG.info("Caught exception while adding replicas " + "from " + v + ". Will throw later.", ioe); - exceptions.add(ioe); + unhealthyDataDirs.put(v, ioe); } } }; @@ -222,13 +223,13 @@ class FsVolumeList { throw new IOException(ie); } } - if (!exceptions.isEmpty()) { - throw exceptions.get(0); - } long totalTimeTaken = Time.monotonicNow() - totalStartTime; FsDatasetImpl.LOG .info("Total time to add all replicas to map for block pool " + bpid + ": " + totalTimeTaken + "ms"); + if (!unhealthyDataDirs.isEmpty()) { + throw new AddBlockPoolException(unhealthyDataDirs); + } } /** @@ -398,9 +399,8 @@ class FsVolumeList { void addBlockPool(final String bpid, final Configuration conf) throws IOException { long totalStartTime = Time.monotonicNow(); - - final List exceptions = Collections.synchronizedList( - new ArrayList()); + final Map unhealthyDataDirs = + new ConcurrentHashMap(); List blockPoolAddingThreads = new ArrayList(); for (final FsVolumeImpl v : volumes) { Thread t = new Thread() { @@ -418,7 +418,7 @@ class FsVolumeList { } catch (IOException ioe) { FsDatasetImpl.LOG.info("Caught exception while scanning " + v + ". Will throw later.", ioe); - exceptions.add(ioe); + unhealthyDataDirs.put(v, ioe); } } }; @@ -432,15 +432,14 @@ class FsVolumeList { throw new IOException(ie); } } - if (!exceptions.isEmpty()) { - throw exceptions.get(0); - } - long totalTimeTaken = Time.monotonicNow() - totalStartTime; FsDatasetImpl.LOG.info("Total time to scan all replicas for block pool " + bpid + ": " + totalTimeTaken + "ms"); + if (!unhealthyDataDirs.isEmpty()) { + throw new AddBlockPoolException(unhealthyDataDirs); + } } - + void removeBlockPool(String bpid, Map blocksPerVolume) { for (FsVolumeImpl v : volumes) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 29807ec4e0c..cb335e17f87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2362,14 +2362,19 @@ public class MiniDFSCluster implements AutoCloseable { return restartDataNode(dnprop, false); } - private void waitDataNodeFullyStarted(final DataNode dn) + public void waitDatanodeFullyStarted(DataNode dn, int timeout) throws TimeoutException, InterruptedException { GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { return dn.isDatanodeFullyStarted(); } - }, 100, 60000); + }, 100, timeout); + } + + private void waitDataNodeFullyStarted(final DataNode dn) + throws TimeoutException, InterruptedException { + waitDatanodeFullyStarted(dn, 60000); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index eb9461f7467..a43536fe67b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -430,7 +430,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { * Class used for tracking datanode level storage utilization similar * to {@link FSVolumeSet} */ - private static class SimulatedStorage { + static class SimulatedStorage { private final Map map = new ConcurrentHashMap<>(); @@ -615,7 +615,11 @@ public class SimulatedFSDataset implements FsDatasetSpi { @Override public StorageLocation getStorageLocation() { - return null; + try { + return StorageLocation.parse("[DISK]file:///simulated"); + } catch (Exception e) { + return null; + } } @Override @@ -663,6 +667,10 @@ public class SimulatedFSDataset implements FsDatasetSpi { private final DataNode datanode; + public List getStorages() { + return storages; + } + public SimulatedFSDataset(DataStorage storage, Configuration conf) { this(null, storage, conf); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 7d04942f0e0..4c691b4ee15 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; @@ -31,12 +32,16 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.TrueFileFilter; +import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -64,6 +69,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.AddBlockPoolException; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; @@ -219,6 +225,50 @@ public class TestDataNodeVolumeFailure { " is created and replicated"); } + /* + * If one of the sub-folders under the finalized directory is unreadable, + * either due to permissions or a filesystem corruption, the DN will fail + * to read it when scanning it for blocks to load into the replica map. This + * test ensures the DN does not exit and reports the failed volume to the + * NN (HDFS-14333). This is done by using a simulated FsDataset that throws + * an exception for a failed volume when the block pool is initialized. + */ + @Test(timeout=15000) + public void testDnStartsAfterDiskErrorScanningBlockPool() throws Exception { + // Don't use the cluster configured in the setup() method for this test. + cluster.shutdown(true); + cluster.close(); + + conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY, + BadDiskFSDataset.Factory.class.getName()); + + final MiniDFSCluster localCluster = new MiniDFSCluster + .Builder(conf).numDataNodes(1).build(); + + try { + localCluster.waitActive(); + DataNode dn = localCluster.getDataNodes().get(0); + + try { + localCluster.waitDatanodeFullyStarted(dn, 3000); + } catch (TimeoutException e) { + fail("Datanode did not get fully started"); + } + assertTrue(dn.isDatanodeUp()); + + // trigger DN to send heartbeat + DataNodeTestUtils.triggerHeartbeat(dn); + final BlockManager bm = localCluster.getNamesystem().getBlockManager(); + // trigger NN handle heartbeat + BlockManagerTestUtil.checkHeartbeat(bm); + + // NN now should have the failed volume + assertEquals(1, localCluster.getNamesystem().getVolumeFailuresTotal()); + } finally { + localCluster.close(); + } + } + /** * Test that DataStorage and BlockPoolSliceStorage remove the failed volume * after failure. @@ -758,4 +808,64 @@ public class TestDataNodeVolumeFailure { } return total; } + + private static class BadDiskFSDataset extends SimulatedFSDataset { + + BadDiskFSDataset(DataStorage storage, Configuration conf) { + super(storage, conf); + } + + private String[] failedStorageLocations = null; + + @Override + public void addBlockPool(String bpid, Configuration conf) { + super.addBlockPool(bpid, conf); + Map + unhealthyDataDirs = new HashMap<>(); + unhealthyDataDirs.put(this.getStorages().get(0).getVolume(), + new IOException()); + throw new AddBlockPoolException(unhealthyDataDirs); + } + + @Override + public synchronized void removeVolumes(Collection volumes, + boolean clearFailure) { + Iterator itr = volumes.iterator(); + String[] failedLocations = new String[volumes.size()]; + int index = 0; + while(itr.hasNext()) { + StorageLocation s = itr.next(); + failedLocations[index] = s.getUri().getPath(); + index += 1; + } + failedStorageLocations = failedLocations; + } + + @Override + public void handleVolumeFailures(Set failedVolumes) { + // do nothing + } + + @Override + public VolumeFailureSummary getVolumeFailureSummary() { + if (failedStorageLocations != null) { + return new VolumeFailureSummary(failedStorageLocations, 0, 0); + } else { + return new VolumeFailureSummary(ArrayUtils.EMPTY_STRING_ARRAY, 0, 0); + } + } + + static class Factory extends FsDatasetSpi.Factory { + @Override + public BadDiskFSDataset newInstance(DataNode datanode, + DataStorage storage, Configuration conf) throws IOException { + return new BadDiskFSDataset(storage, conf); + } + + @Override + public boolean isSimulated() { + return true; + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java index c4d1e57e720..2d939fad261 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java @@ -500,6 +500,7 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils { * @param level the level to set */ public static void setFsDatasetImplLogLevel(Level level) { - GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, level); + GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, + org.slf4j.event.Level.valueOf(level.toString())); } }