HDFS-14333. Datanode fails to start if any disk has errors during Namenode registration. Contributed by Stephen O'Donnell.
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
(cherry picked from commit 34b14061b3
)
This commit is contained in:
parent
b6f6c34223
commit
a21e2e4dbc
|
@ -166,6 +166,7 @@ import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResour
|
|||
import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.AddBlockPoolException;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeDiskMetrics;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
|
||||
|
@ -1681,13 +1682,37 @@ public class DataNode extends ReconfigurableBase
|
|||
// Exclude failed disks before initializing the block pools to avoid startup
|
||||
// failures.
|
||||
checkDiskError();
|
||||
|
||||
try {
|
||||
data.addBlockPool(nsInfo.getBlockPoolID(), getConf());
|
||||
} catch (AddBlockPoolException e) {
|
||||
handleAddBlockPoolError(e);
|
||||
}
|
||||
blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
|
||||
initDirectoryScanner(getConf());
|
||||
initDiskBalancer(data, getConf());
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles an AddBlockPoolException object thrown from
|
||||
* {@link org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeList#
|
||||
* addBlockPool}. Will ensure that all volumes that encounted a
|
||||
* AddBlockPoolException are removed from the DataNode and marked as failed
|
||||
* volumes in the same way as a runtime volume failure.
|
||||
*
|
||||
* @param e this exception is a container for all IOException objects caught
|
||||
* in FsVolumeList#addBlockPool.
|
||||
*/
|
||||
private void handleAddBlockPoolError(AddBlockPoolException e)
|
||||
throws IOException {
|
||||
Map<FsVolumeSpi, IOException> unhealthyDataDirs =
|
||||
e.getFailingVolumes();
|
||||
if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) {
|
||||
handleVolumeFailures(unhealthyDataDirs.keySet());
|
||||
} else {
|
||||
LOG.debug("HandleAddBlockPoolError called with empty exception list");
|
||||
}
|
||||
}
|
||||
|
||||
List<BPOfferService> getAllBpOs() {
|
||||
return blockPoolManager.getAllNamenodeThreads();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
|
||||
/**
|
||||
* This exception collects all IOExceptions thrown when adding block pools and
|
||||
* scanning volumes. It keeps the information about which volume is associated
|
||||
* with an exception.
|
||||
*
|
||||
*/
|
||||
public class AddBlockPoolException extends RuntimeException {
|
||||
private Map<FsVolumeSpi, IOException> unhealthyDataDirs;
|
||||
public AddBlockPoolException(Map<FsVolumeSpi, IOException>
|
||||
unhealthyDataDirs) {
|
||||
this.unhealthyDataDirs = unhealthyDataDirs;
|
||||
}
|
||||
|
||||
public Map<FsVolumeSpi, IOException> getFailingVolumes() {
|
||||
return unhealthyDataDirs;
|
||||
}
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getName() + ": " + unhealthyDataDirs.toString();
|
||||
}
|
||||
}
|
|
@ -22,6 +22,7 @@ import java.nio.channels.ClosedChannelException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -188,8 +189,8 @@ class FsVolumeList {
|
|||
final RamDiskReplicaTracker ramDiskReplicaMap)
|
||||
throws IOException {
|
||||
long totalStartTime = Time.monotonicNow();
|
||||
final List<IOException> exceptions = Collections.synchronizedList(
|
||||
new ArrayList<IOException>());
|
||||
final Map<FsVolumeSpi, IOException> unhealthyDataDirs =
|
||||
new ConcurrentHashMap<FsVolumeSpi, IOException>();
|
||||
List<Thread> replicaAddingThreads = new ArrayList<Thread>();
|
||||
for (final FsVolumeImpl v : volumes) {
|
||||
Thread t = new Thread() {
|
||||
|
@ -208,7 +209,7 @@ class FsVolumeList {
|
|||
} catch (IOException ioe) {
|
||||
FsDatasetImpl.LOG.info("Caught exception while adding replicas " +
|
||||
"from " + v + ". Will throw later.", ioe);
|
||||
exceptions.add(ioe);
|
||||
unhealthyDataDirs.put(v, ioe);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -222,13 +223,13 @@ class FsVolumeList {
|
|||
throw new IOException(ie);
|
||||
}
|
||||
}
|
||||
if (!exceptions.isEmpty()) {
|
||||
throw exceptions.get(0);
|
||||
}
|
||||
long totalTimeTaken = Time.monotonicNow() - totalStartTime;
|
||||
FsDatasetImpl.LOG
|
||||
.info("Total time to add all replicas to map for block pool " + bpid
|
||||
+ ": " + totalTimeTaken + "ms");
|
||||
if (!unhealthyDataDirs.isEmpty()) {
|
||||
throw new AddBlockPoolException(unhealthyDataDirs);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -398,9 +399,8 @@ class FsVolumeList {
|
|||
|
||||
void addBlockPool(final String bpid, final Configuration conf) throws IOException {
|
||||
long totalStartTime = Time.monotonicNow();
|
||||
|
||||
final List<IOException> exceptions = Collections.synchronizedList(
|
||||
new ArrayList<IOException>());
|
||||
final Map<FsVolumeSpi, IOException> unhealthyDataDirs =
|
||||
new ConcurrentHashMap<FsVolumeSpi, IOException>();
|
||||
List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
|
||||
for (final FsVolumeImpl v : volumes) {
|
||||
Thread t = new Thread() {
|
||||
|
@ -418,7 +418,7 @@ class FsVolumeList {
|
|||
} catch (IOException ioe) {
|
||||
FsDatasetImpl.LOG.info("Caught exception while scanning " + v +
|
||||
". Will throw later.", ioe);
|
||||
exceptions.add(ioe);
|
||||
unhealthyDataDirs.put(v, ioe);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -432,13 +432,12 @@ class FsVolumeList {
|
|||
throw new IOException(ie);
|
||||
}
|
||||
}
|
||||
if (!exceptions.isEmpty()) {
|
||||
throw exceptions.get(0);
|
||||
}
|
||||
|
||||
long totalTimeTaken = Time.monotonicNow() - totalStartTime;
|
||||
FsDatasetImpl.LOG.info("Total time to scan all replicas for block pool " +
|
||||
bpid + ": " + totalTimeTaken + "ms");
|
||||
if (!unhealthyDataDirs.isEmpty()) {
|
||||
throw new AddBlockPoolException(unhealthyDataDirs);
|
||||
}
|
||||
}
|
||||
|
||||
void removeBlockPool(String bpid, Map<DatanodeStorage, BlockListAsLongs>
|
||||
|
|
|
@ -2362,14 +2362,19 @@ public class MiniDFSCluster implements AutoCloseable {
|
|||
return restartDataNode(dnprop, false);
|
||||
}
|
||||
|
||||
private void waitDataNodeFullyStarted(final DataNode dn)
|
||||
public void waitDatanodeFullyStarted(DataNode dn, int timeout)
|
||||
throws TimeoutException, InterruptedException {
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
return dn.isDatanodeFullyStarted();
|
||||
}
|
||||
}, 100, 60000);
|
||||
}, 100, timeout);
|
||||
}
|
||||
|
||||
private void waitDataNodeFullyStarted(final DataNode dn)
|
||||
throws TimeoutException, InterruptedException {
|
||||
waitDatanodeFullyStarted(dn, 60000);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -430,7 +430,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
* Class used for tracking datanode level storage utilization similar
|
||||
* to {@link FSVolumeSet}
|
||||
*/
|
||||
private static class SimulatedStorage {
|
||||
static class SimulatedStorage {
|
||||
private final Map<String, SimulatedBPStorage> map =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
|
@ -615,8 +615,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
|
||||
@Override
|
||||
public StorageLocation getStorageLocation() {
|
||||
try {
|
||||
return StorageLocation.parse("[DISK]file:///simulated");
|
||||
} catch (Exception e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getBaseURI() {
|
||||
|
@ -663,6 +667,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
private final DataNode datanode;
|
||||
|
||||
|
||||
public List<SimulatedStorage> getStorages() {
|
||||
return storages;
|
||||
}
|
||||
|
||||
public SimulatedFSDataset(DataStorage storage, Configuration conf) {
|
||||
this(null, storage, conf);
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import static org.junit.Assert.assertFalse;
|
|||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -31,12 +32,16 @@ import java.net.InetSocketAddress;
|
|||
import java.net.Socket;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.filefilter.TrueFileFilter;
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
|
@ -64,6 +69,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
|||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.AddBlockPoolException;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
|
@ -219,6 +225,50 @@ public class TestDataNodeVolumeFailure {
|
|||
" is created and replicated");
|
||||
}
|
||||
|
||||
/*
|
||||
* If one of the sub-folders under the finalized directory is unreadable,
|
||||
* either due to permissions or a filesystem corruption, the DN will fail
|
||||
* to read it when scanning it for blocks to load into the replica map. This
|
||||
* test ensures the DN does not exit and reports the failed volume to the
|
||||
* NN (HDFS-14333). This is done by using a simulated FsDataset that throws
|
||||
* an exception for a failed volume when the block pool is initialized.
|
||||
*/
|
||||
@Test(timeout=15000)
|
||||
public void testDnStartsAfterDiskErrorScanningBlockPool() throws Exception {
|
||||
// Don't use the cluster configured in the setup() method for this test.
|
||||
cluster.shutdown(true);
|
||||
cluster.close();
|
||||
|
||||
conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
|
||||
BadDiskFSDataset.Factory.class.getName());
|
||||
|
||||
final MiniDFSCluster localCluster = new MiniDFSCluster
|
||||
.Builder(conf).numDataNodes(1).build();
|
||||
|
||||
try {
|
||||
localCluster.waitActive();
|
||||
DataNode dn = localCluster.getDataNodes().get(0);
|
||||
|
||||
try {
|
||||
localCluster.waitDatanodeFullyStarted(dn, 3000);
|
||||
} catch (TimeoutException e) {
|
||||
fail("Datanode did not get fully started");
|
||||
}
|
||||
assertTrue(dn.isDatanodeUp());
|
||||
|
||||
// trigger DN to send heartbeat
|
||||
DataNodeTestUtils.triggerHeartbeat(dn);
|
||||
final BlockManager bm = localCluster.getNamesystem().getBlockManager();
|
||||
// trigger NN handle heartbeat
|
||||
BlockManagerTestUtil.checkHeartbeat(bm);
|
||||
|
||||
// NN now should have the failed volume
|
||||
assertEquals(1, localCluster.getNamesystem().getVolumeFailuresTotal());
|
||||
} finally {
|
||||
localCluster.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that DataStorage and BlockPoolSliceStorage remove the failed volume
|
||||
* after failure.
|
||||
|
@ -758,4 +808,64 @@ public class TestDataNodeVolumeFailure {
|
|||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
private static class BadDiskFSDataset extends SimulatedFSDataset {
|
||||
|
||||
BadDiskFSDataset(DataStorage storage, Configuration conf) {
|
||||
super(storage, conf);
|
||||
}
|
||||
|
||||
private String[] failedStorageLocations = null;
|
||||
|
||||
@Override
|
||||
public void addBlockPool(String bpid, Configuration conf) {
|
||||
super.addBlockPool(bpid, conf);
|
||||
Map<FsVolumeSpi, IOException>
|
||||
unhealthyDataDirs = new HashMap<>();
|
||||
unhealthyDataDirs.put(this.getStorages().get(0).getVolume(),
|
||||
new IOException());
|
||||
throw new AddBlockPoolException(unhealthyDataDirs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void removeVolumes(Collection<StorageLocation> volumes,
|
||||
boolean clearFailure) {
|
||||
Iterator<StorageLocation> itr = volumes.iterator();
|
||||
String[] failedLocations = new String[volumes.size()];
|
||||
int index = 0;
|
||||
while(itr.hasNext()) {
|
||||
StorageLocation s = itr.next();
|
||||
failedLocations[index] = s.getUri().getPath();
|
||||
index += 1;
|
||||
}
|
||||
failedStorageLocations = failedLocations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
public VolumeFailureSummary getVolumeFailureSummary() {
|
||||
if (failedStorageLocations != null) {
|
||||
return new VolumeFailureSummary(failedStorageLocations, 0, 0);
|
||||
} else {
|
||||
return new VolumeFailureSummary(ArrayUtils.EMPTY_STRING_ARRAY, 0, 0);
|
||||
}
|
||||
}
|
||||
|
||||
static class Factory extends FsDatasetSpi.Factory<BadDiskFSDataset> {
|
||||
@Override
|
||||
public BadDiskFSDataset newInstance(DataNode datanode,
|
||||
DataStorage storage, Configuration conf) throws IOException {
|
||||
return new BadDiskFSDataset(storage, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSimulated() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -500,6 +500,7 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
|
|||
* @param level the level to set
|
||||
*/
|
||||
public static void setFsDatasetImplLogLevel(Level level) {
|
||||
GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, level);
|
||||
GenericTestUtils.setLogLevel(FsDatasetImpl.LOG,
|
||||
org.slf4j.event.Level.valueOf(level.toString()));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue