HDFS-14333. Datanode fails to start if any disk has errors during Namenode registration. Contributed by Stephen O'Donnell.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
Stephen O'Donnell 2019-03-12 10:16:28 -07:00 committed by Wei-Chiu Chuang
parent aab7b77536
commit 34b14061b3
7 changed files with 215 additions and 22 deletions

View File

@ -166,6 +166,7 @@ import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResour
import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker; import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.AddBlockPoolException;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeDiskMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeDiskMetrics;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
@ -1689,13 +1690,37 @@ public class DataNode extends ReconfigurableBase
// Exclude failed disks before initializing the block pools to avoid startup // Exclude failed disks before initializing the block pools to avoid startup
// failures. // failures.
checkDiskError(); checkDiskError();
try {
data.addBlockPool(nsInfo.getBlockPoolID(), getConf()); data.addBlockPool(nsInfo.getBlockPoolID(), getConf());
} catch (AddBlockPoolException e) {
handleAddBlockPoolError(e);
}
blockScanner.enableBlockPoolId(bpos.getBlockPoolId()); blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
initDirectoryScanner(getConf()); initDirectoryScanner(getConf());
initDiskBalancer(data, getConf()); initDiskBalancer(data, getConf());
} }
/**
* Handles an AddBlockPoolException object thrown from
* {@link org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeList#
* addBlockPool}. Will ensure that all volumes that encounted a
* AddBlockPoolException are removed from the DataNode and marked as failed
* volumes in the same way as a runtime volume failure.
*
* @param e this exception is a container for all IOException objects caught
* in FsVolumeList#addBlockPool.
*/
private void handleAddBlockPoolError(AddBlockPoolException e)
throws IOException {
Map<FsVolumeSpi, IOException> unhealthyDataDirs =
e.getFailingVolumes();
if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) {
handleVolumeFailures(unhealthyDataDirs.keySet());
} else {
LOG.debug("HandleAddBlockPoolError called with empty exception list");
}
}
List<BPOfferService> getAllBpOs() { List<BPOfferService> getAllBpOs() {
return blockPoolManager.getAllNamenodeThreads(); return blockPoolManager.getAllNamenodeThreads();
} }

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
/**
* This exception collects all IOExceptions thrown when adding block pools and
* scanning volumes. It keeps the information about which volume is associated
* with an exception.
*
*/
public class AddBlockPoolException extends RuntimeException {
private Map<FsVolumeSpi, IOException> unhealthyDataDirs;
public AddBlockPoolException(Map<FsVolumeSpi, IOException>
unhealthyDataDirs) {
this.unhealthyDataDirs = unhealthyDataDirs;
}
public Map<FsVolumeSpi, IOException> getFailingVolumes() {
return unhealthyDataDirs;
}
@Override
public String toString() {
return getClass().getName() + ": " + unhealthyDataDirs.toString();
}
}

View File

@ -22,6 +22,7 @@ import java.nio.channels.ClosedChannelException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -188,8 +189,8 @@ class FsVolumeList {
final RamDiskReplicaTracker ramDiskReplicaMap) final RamDiskReplicaTracker ramDiskReplicaMap)
throws IOException { throws IOException {
long totalStartTime = Time.monotonicNow(); long totalStartTime = Time.monotonicNow();
final List<IOException> exceptions = Collections.synchronizedList( final Map<FsVolumeSpi, IOException> unhealthyDataDirs =
new ArrayList<IOException>()); new ConcurrentHashMap<FsVolumeSpi, IOException>();
List<Thread> replicaAddingThreads = new ArrayList<Thread>(); List<Thread> replicaAddingThreads = new ArrayList<Thread>();
for (final FsVolumeImpl v : volumes) { for (final FsVolumeImpl v : volumes) {
Thread t = new Thread() { Thread t = new Thread() {
@ -208,7 +209,7 @@ class FsVolumeList {
} catch (IOException ioe) { } catch (IOException ioe) {
FsDatasetImpl.LOG.info("Caught exception while adding replicas " + FsDatasetImpl.LOG.info("Caught exception while adding replicas " +
"from " + v + ". Will throw later.", ioe); "from " + v + ". Will throw later.", ioe);
exceptions.add(ioe); unhealthyDataDirs.put(v, ioe);
} }
} }
}; };
@ -222,13 +223,13 @@ class FsVolumeList {
throw new IOException(ie); throw new IOException(ie);
} }
} }
if (!exceptions.isEmpty()) {
throw exceptions.get(0);
}
long totalTimeTaken = Time.monotonicNow() - totalStartTime; long totalTimeTaken = Time.monotonicNow() - totalStartTime;
FsDatasetImpl.LOG FsDatasetImpl.LOG
.info("Total time to add all replicas to map for block pool " + bpid .info("Total time to add all replicas to map for block pool " + bpid
+ ": " + totalTimeTaken + "ms"); + ": " + totalTimeTaken + "ms");
if (!unhealthyDataDirs.isEmpty()) {
throw new AddBlockPoolException(unhealthyDataDirs);
}
} }
/** /**
@ -398,9 +399,8 @@ class FsVolumeList {
void addBlockPool(final String bpid, final Configuration conf) throws IOException { void addBlockPool(final String bpid, final Configuration conf) throws IOException {
long totalStartTime = Time.monotonicNow(); long totalStartTime = Time.monotonicNow();
final Map<FsVolumeSpi, IOException> unhealthyDataDirs =
final List<IOException> exceptions = Collections.synchronizedList( new ConcurrentHashMap<FsVolumeSpi, IOException>();
new ArrayList<IOException>());
List<Thread> blockPoolAddingThreads = new ArrayList<Thread>(); List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
for (final FsVolumeImpl v : volumes) { for (final FsVolumeImpl v : volumes) {
Thread t = new Thread() { Thread t = new Thread() {
@ -418,7 +418,7 @@ class FsVolumeList {
} catch (IOException ioe) { } catch (IOException ioe) {
FsDatasetImpl.LOG.info("Caught exception while scanning " + v + FsDatasetImpl.LOG.info("Caught exception while scanning " + v +
". Will throw later.", ioe); ". Will throw later.", ioe);
exceptions.add(ioe); unhealthyDataDirs.put(v, ioe);
} }
} }
}; };
@ -432,15 +432,14 @@ class FsVolumeList {
throw new IOException(ie); throw new IOException(ie);
} }
} }
if (!exceptions.isEmpty()) {
throw exceptions.get(0);
}
long totalTimeTaken = Time.monotonicNow() - totalStartTime; long totalTimeTaken = Time.monotonicNow() - totalStartTime;
FsDatasetImpl.LOG.info("Total time to scan all replicas for block pool " + FsDatasetImpl.LOG.info("Total time to scan all replicas for block pool " +
bpid + ": " + totalTimeTaken + "ms"); bpid + ": " + totalTimeTaken + "ms");
if (!unhealthyDataDirs.isEmpty()) {
throw new AddBlockPoolException(unhealthyDataDirs);
}
} }
void removeBlockPool(String bpid, Map<DatanodeStorage, BlockListAsLongs> void removeBlockPool(String bpid, Map<DatanodeStorage, BlockListAsLongs>
blocksPerVolume) { blocksPerVolume) {
for (FsVolumeImpl v : volumes) { for (FsVolumeImpl v : volumes) {

View File

@ -2403,14 +2403,19 @@ public class MiniDFSCluster implements AutoCloseable {
return restartDataNode(dnprop, false); return restartDataNode(dnprop, false);
} }
private void waitDataNodeFullyStarted(final DataNode dn) public void waitDatanodeFullyStarted(DataNode dn, int timeout)
throws TimeoutException, InterruptedException { throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override @Override
public Boolean get() { public Boolean get() {
return dn.isDatanodeFullyStarted(); return dn.isDatanodeFullyStarted();
} }
}, 100, 60000); }, 100, timeout);
}
private void waitDataNodeFullyStarted(final DataNode dn)
throws TimeoutException, InterruptedException {
waitDatanodeFullyStarted(dn, 60000);
} }
/** /**

View File

@ -450,7 +450,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
* Class used for tracking datanode level storage utilization similar * Class used for tracking datanode level storage utilization similar
* to {@link FSVolumeSet} * to {@link FSVolumeSet}
*/ */
private static class SimulatedStorage { static class SimulatedStorage {
private final Map<String, SimulatedBPStorage> map = private final Map<String, SimulatedBPStorage> map =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
@ -635,7 +635,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override @Override
public StorageLocation getStorageLocation() { public StorageLocation getStorageLocation() {
return null; try {
return StorageLocation.parse("[DISK]file:///simulated");
} catch (Exception e) {
return null;
}
} }
@Override @Override
@ -681,6 +685,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
private final String datanodeUuid; private final String datanodeUuid;
private final DataNode datanode; private final DataNode datanode;
public List<SimulatedStorage> getStorages() {
return storages;
}
public SimulatedFSDataset(DataStorage storage, Configuration conf) { public SimulatedFSDataset(DataStorage storage, Configuration conf) {
this(null, storage, conf); this(null, storage, conf);
} }

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -31,12 +32,16 @@ import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter; import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
@ -64,6 +69,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.AddBlockPoolException;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@ -219,6 +225,50 @@ public class TestDataNodeVolumeFailure {
" is created and replicated"); " is created and replicated");
} }
/*
* If one of the sub-folders under the finalized directory is unreadable,
* either due to permissions or a filesystem corruption, the DN will fail
* to read it when scanning it for blocks to load into the replica map. This
* test ensures the DN does not exit and reports the failed volume to the
* NN (HDFS-14333). This is done by using a simulated FsDataset that throws
* an exception for a failed volume when the block pool is initialized.
*/
@Test(timeout=15000)
public void testDnStartsAfterDiskErrorScanningBlockPool() throws Exception {
// Don't use the cluster configured in the setup() method for this test.
cluster.shutdown(true);
cluster.close();
conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
BadDiskFSDataset.Factory.class.getName());
final MiniDFSCluster localCluster = new MiniDFSCluster
.Builder(conf).numDataNodes(1).build();
try {
localCluster.waitActive();
DataNode dn = localCluster.getDataNodes().get(0);
try {
localCluster.waitDatanodeFullyStarted(dn, 3000);
} catch (TimeoutException e) {
fail("Datanode did not get fully started");
}
assertTrue(dn.isDatanodeUp());
// trigger DN to send heartbeat
DataNodeTestUtils.triggerHeartbeat(dn);
final BlockManager bm = localCluster.getNamesystem().getBlockManager();
// trigger NN handle heartbeat
BlockManagerTestUtil.checkHeartbeat(bm);
// NN now should have the failed volume
assertEquals(1, localCluster.getNamesystem().getVolumeFailuresTotal());
} finally {
localCluster.close();
}
}
/** /**
* Test that DataStorage and BlockPoolSliceStorage remove the failed volume * Test that DataStorage and BlockPoolSliceStorage remove the failed volume
* after failure. * after failure.
@ -760,4 +810,64 @@ public class TestDataNodeVolumeFailure {
} }
return total; return total;
} }
private static class BadDiskFSDataset extends SimulatedFSDataset {
BadDiskFSDataset(DataStorage storage, Configuration conf) {
super(storage, conf);
}
private String[] failedStorageLocations = null;
@Override
public void addBlockPool(String bpid, Configuration conf) {
super.addBlockPool(bpid, conf);
Map<FsVolumeSpi, IOException>
unhealthyDataDirs = new HashMap<>();
unhealthyDataDirs.put(this.getStorages().get(0).getVolume(),
new IOException());
throw new AddBlockPoolException(unhealthyDataDirs);
}
@Override
public synchronized void removeVolumes(Collection<StorageLocation> volumes,
boolean clearFailure) {
Iterator<StorageLocation> itr = volumes.iterator();
String[] failedLocations = new String[volumes.size()];
int index = 0;
while(itr.hasNext()) {
StorageLocation s = itr.next();
failedLocations[index] = s.getUri().getPath();
index += 1;
}
failedStorageLocations = failedLocations;
}
@Override
public void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
// do nothing
}
@Override
public VolumeFailureSummary getVolumeFailureSummary() {
if (failedStorageLocations != null) {
return new VolumeFailureSummary(failedStorageLocations, 0, 0);
} else {
return new VolumeFailureSummary(ArrayUtils.EMPTY_STRING_ARRAY, 0, 0);
}
}
static class Factory extends FsDatasetSpi.Factory<BadDiskFSDataset> {
@Override
public BadDiskFSDataset newInstance(DataNode datanode,
DataStorage storage, Configuration conf) throws IOException {
return new BadDiskFSDataset(storage, conf);
}
@Override
public boolean isSimulated() {
return true;
}
}
}
} }

View File

@ -500,6 +500,7 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
* @param level the level to set * @param level the level to set
*/ */
public static void setFsDatasetImplLogLevel(Level level) { public static void setFsDatasetImplLogLevel(Level level) {
GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, level); GenericTestUtils.setLogLevel(FsDatasetImpl.LOG,
org.slf4j.event.Level.valueOf(level.toString()));
} }
} }