HDFS-7722. DataNode#checkDiskError should also remove Storage when error is found. (Lei Xu via Colin P. McCabe)

This commit is contained in:
Colin Patrick Mccabe 2015-03-12 12:00:18 -07:00
parent 6dae6d12ec
commit b49c3a1813
13 changed files with 330 additions and 133 deletions

View File

@ -1162,6 +1162,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7806. Refactor: move StorageType from hadoop-hdfs to HDFS-7806. Refactor: move StorageType from hadoop-hdfs to
hadoop-common. (Xiaoyu Yao via Arpit Agarwal) hadoop-common. (Xiaoyu Yao via Arpit Agarwal)
HDFS-7722. DataNode#checkDiskError should also remove Storage when error
is found. (Lei Xu via Colin P. McCabe)
Release 2.6.1 - UNRELEASED Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -53,6 +53,7 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.EOFException; import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
@ -73,6 +74,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -613,20 +615,16 @@ public class DataNode extends ReconfigurableBase
errorMessageBuilder.append( errorMessageBuilder.append(
String.format("FAILED to ADD: %s: %s%n", volume, String.format("FAILED to ADD: %s: %s%n", volume,
e.toString())); e.toString()));
LOG.error("Failed to add volume: " + volume, e);
} }
} }
} }
if (!changedVolumes.deactivateLocations.isEmpty()) {
LOG.info("Deactivating volumes: " +
Joiner.on(",").join(changedVolumes.deactivateLocations));
data.removeVolumes(changedVolumes.deactivateLocations);
try { try {
storage.removeVolumes(changedVolumes.deactivateLocations); removeVolumes(changedVolumes.deactivateLocations);
} catch (IOException e) { } catch (IOException e) {
errorMessageBuilder.append(e.getMessage()); errorMessageBuilder.append(e.getMessage());
} LOG.error("Failed to remove volume: " + e.getMessage(), e);
} }
if (errorMessageBuilder.length() > 0) { if (errorMessageBuilder.length() > 0) {
@ -639,6 +637,79 @@ public class DataNode extends ReconfigurableBase
} }
} }
/**
* Remove volumes from DataNode.
* See {@link removeVolumes(final Set<File>, boolean)} for details.
*
* @param locations the StorageLocations of the volumes to be removed.
* @throws IOException
*/
private void removeVolumes(final Collection<StorageLocation> locations)
throws IOException {
if (locations.isEmpty()) {
return;
}
Set<File> volumesToRemove = new HashSet<>();
for (StorageLocation loc : locations) {
volumesToRemove.add(loc.getFile().getAbsoluteFile());
}
removeVolumes(volumesToRemove, true);
}
/**
* Remove volumes from DataNode.
*
* It does three things:
* <li>
* <ul>Remove volumes and block info from FsDataset.</ul>
* <ul>Remove volumes from DataStorage.</ul>
* <ul>Reset configuration DATA_DIR and {@link dataDirs} to represent
* active volumes.</ul>
* </li>
* @param absoluteVolumePaths the absolute path of volumes.
* @param clearFailure if true, clears the failure information related to the
* volumes.
* @throws IOException
*/
private synchronized void removeVolumes(
final Set<File> absoluteVolumePaths, boolean clearFailure)
throws IOException {
for (File vol : absoluteVolumePaths) {
Preconditions.checkArgument(vol.isAbsolute());
}
if (absoluteVolumePaths.isEmpty()) {
return;
}
LOG.info(String.format("Deactivating volumes (clear failure=%b): %s",
clearFailure, Joiner.on(",").join(absoluteVolumePaths)));
IOException ioe = null;
// Remove volumes and block infos from FsDataset.
data.removeVolumes(absoluteVolumePaths, clearFailure);
// Remove volumes from DataStorage.
try {
storage.removeVolumes(absoluteVolumePaths);
} catch (IOException e) {
ioe = e;
}
// Set configuration and dataDirs to reflect volume changes.
for (Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext(); ) {
StorageLocation loc = it.next();
if (absoluteVolumePaths.contains(loc.getFile().getAbsoluteFile())) {
it.remove();
}
}
conf.set(DFS_DATANODE_DATA_DIR_KEY, Joiner.on(",").join(dataDirs));
if (ioe != null) {
throw ioe;
}
}
private synchronized void setClusterId(final String nsCid, final String bpid private synchronized void setClusterId(final String nsCid, final String bpid
) throws IOException { ) throws IOException {
if(clusterId != null && !clusterId.equals(nsCid)) { if(clusterId != null && !clusterId.equals(nsCid)) {
@ -3076,10 +3147,20 @@ public class DataNode extends ReconfigurableBase
* Check the disk error * Check the disk error
*/ */
private void checkDiskError() { private void checkDiskError() {
Set<File> unhealthyDataDirs = data.checkDataDir();
if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) {
try { try {
data.checkDataDir(); // Remove all unhealthy volumes from DataNode.
} catch (DiskErrorException de) { removeVolumes(unhealthyDataDirs, false);
handleDiskError(de.getMessage()); } catch (IOException e) {
LOG.warn("Error occurred when removing unhealthy storage dirs: "
+ e.getMessage(), e);
}
StringBuilder sb = new StringBuilder("DataNode failed volumes:");
for (File dataDir : unhealthyDataDirs) {
sb.append(dataDir.getAbsolutePath() + ";");
}
handleDiskError(sb.toString());
} }
} }

View File

@ -404,28 +404,23 @@ public class DataStorage extends Storage {
} }
/** /**
* Remove volumes from DataStorage. All volumes are removed even when the * Remove storage dirs from DataStorage. All storage dirs are removed even when the
* IOException is thrown. * IOException is thrown.
* *
* @param locations a collection of volumes. * @param dirsToRemove a set of storage directories to be removed.
* @throws IOException if I/O error when unlocking storage directory. * @throws IOException if I/O error when unlocking storage directory.
*/ */
synchronized void removeVolumes(Collection<StorageLocation> locations) synchronized void removeVolumes(final Set<File> dirsToRemove)
throws IOException { throws IOException {
if (locations.isEmpty()) { if (dirsToRemove.isEmpty()) {
return; return;
} }
Set<File> dataDirs = new HashSet<File>();
for (StorageLocation sl : locations) {
dataDirs.add(sl.getFile());
}
StringBuilder errorMsgBuilder = new StringBuilder(); StringBuilder errorMsgBuilder = new StringBuilder();
for (Iterator<StorageDirectory> it = this.storageDirs.iterator(); for (Iterator<StorageDirectory> it = this.storageDirs.iterator();
it.hasNext(); ) { it.hasNext(); ) {
StorageDirectory sd = it.next(); StorageDirectory sd = it.next();
if (dataDirs.contains(sd.getRoot())) { if (dirsToRemove.contains(sd.getRoot())) {
// Remove the block pool level storage first. // Remove the block pool level storage first.
for (Map.Entry<String, BlockPoolSliceStorage> entry : for (Map.Entry<String, BlockPoolSliceStorage> entry :
this.bpStorageMap.entrySet()) { this.bpStorageMap.entrySet()) {

View File

@ -27,6 +27,7 @@ import java.io.InputStream;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -113,9 +114,11 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* If the FSDataset supports block scanning, this function removes * If the FSDataset supports block scanning, this function removes
* the volumes from the block scanner. * the volumes from the block scanner.
* *
* @param volumes The storage locations of the volumes to remove. * @param volumes The paths of the volumes to be removed.
* @param clearFailure set true to clear the failure information about the
* volumes.
*/ */
public void removeVolumes(Collection<StorageLocation> volumes); public void removeVolumes(Set<File> volumes, boolean clearFailure);
/** @return a storage with the given storage ID */ /** @return a storage with the given storage ID */
public DatanodeStorage getStorage(final String storageUuid); public DatanodeStorage getStorage(final String storageUuid);
@ -388,9 +391,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
/** /**
* Check if all the data directories are healthy * Check if all the data directories are healthy
* @throws DiskErrorException * @return A set of unhealthy data directories.
*/ */
public void checkDataDir() throws DiskErrorException; public Set<File> checkDataDir();
/** /**
* Shutdown the FSDataset * Shutdown the FSDataset

View File

@ -445,41 +445,42 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
} }
/** /**
* Removes a collection of volumes from FsDataset. * Removes a set of volumes from FsDataset.
* @param volumes the root directories of the volumes. * @param volumesToRemove a set of absolute root path of each volume.
* @param clearFailure set true to clear failure information.
* *
* DataNode should call this function before calling * DataNode should call this function before calling
* {@link DataStorage#removeVolumes(java.util.Collection)}. * {@link DataStorage#removeVolumes(java.util.Collection)}.
*/ */
@Override @Override
public synchronized void removeVolumes(Collection<StorageLocation> volumes) { public synchronized void removeVolumes(
Set<String> volumeSet = new HashSet<>(); Set<File> volumesToRemove, boolean clearFailure) {
for (StorageLocation sl : volumes) { // Make sure that all volumes are absolute path.
volumeSet.add(sl.getFile().getAbsolutePath()); for (File vol : volumesToRemove) {
Preconditions.checkArgument(vol.isAbsolute(),
String.format("%s is not absolute path.", vol.getPath()));
} }
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
String volume = sd.getRoot().getAbsolutePath(); final File absRoot = sd.getRoot().getAbsoluteFile();
if (volumeSet.contains(volume)) { if (volumesToRemove.contains(absRoot)) {
LOG.info("Removing " + volume + " from FsDataset."); LOG.info("Removing " + absRoot + " from FsDataset.");
// Disable the volume from the service. // Disable the volume from the service.
asyncDiskService.removeVolume(sd.getCurrentDir()); asyncDiskService.removeVolume(sd.getCurrentDir());
this.volumes.removeVolume(sd.getRoot()); volumes.removeVolume(absRoot, clearFailure);
// Removed all replica information for the blocks on the volume. Unlike // Removed all replica information for the blocks on the volume. Unlike
// updating the volumeMap in addVolume(), this operation does not scan // updating the volumeMap in addVolume(), this operation does not scan
// disks. // disks.
for (String bpid : volumeMap.getBlockPoolList()) { for (String bpid : volumeMap.getBlockPoolList()) {
List<Block> blocks = new ArrayList<Block>();
for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator(); for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
it.hasNext(); ) { it.hasNext(); ) {
ReplicaInfo block = it.next(); ReplicaInfo block = it.next();
String absBasePath = final File absBasePath =
new File(block.getVolume().getBasePath()).getAbsolutePath(); new File(block.getVolume().getBasePath()).getAbsoluteFile();
if (absBasePath.equals(volume)) { if (absBasePath.equals(absRoot)) {
invalidate(bpid, block); invalidate(bpid, block);
blocks.add(block);
it.remove(); it.remove();
} }
} }
@ -1975,50 +1976,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
/** /**
* check if a data directory is healthy * check if a data directory is healthy
* if some volumes failed - make sure to remove all the blocks that belong *
* to these volumes * if some volumes failed - the caller must emove all the blocks that belong
* @throws DiskErrorException * to these failed volumes.
* @return the failed volumes. Returns null if no volume failed.
*/ */
@Override // FsDatasetSpi @Override // FsDatasetSpi
public void checkDataDir() throws DiskErrorException { public Set<File> checkDataDir() {
long totalBlocks=0, removedBlocks=0; return volumes.checkDirs();
List<FsVolumeImpl> failedVols = volumes.checkDirs();
// If there no failed volumes return
if (failedVols == null) {
return;
}
// Otherwise remove blocks for the failed volumes
long mlsec = Time.now();
synchronized (this) {
for (FsVolumeImpl fv: failedVols) {
for (String bpid : fv.getBlockPoolList()) {
Iterator<ReplicaInfo> ib = volumeMap.replicas(bpid).iterator();
while(ib.hasNext()) {
ReplicaInfo b = ib.next();
totalBlocks++;
// check if the volume block belongs to still valid
if(b.getVolume() == fv) {
LOG.warn("Removing replica " + bpid + ":" + b.getBlockId()
+ " on failed volume " + fv.getCurrentDir().getAbsolutePath());
ib.remove();
removedBlocks++;
}
}
}
}
} // end of sync
mlsec = Time.now() - mlsec;
LOG.warn("Removed " + removedBlocks + " out of " + totalBlocks +
"(took " + mlsec + " millisecs)");
// report the error
StringBuilder sb = new StringBuilder();
for (FsVolumeImpl fv : failedVols) {
sb.append(fv.getCurrentDir().getAbsolutePath() + ";");
}
throw new DiskErrorException("DataNode failed volumes:" + sb);
} }

View File

@ -289,7 +289,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
} }
} }
long getDfsUsed() throws IOException { @VisibleForTesting
public long getDfsUsed() throws IOException {
long dfsUsed = 0; long dfsUsed = 0;
synchronized(dataset) { synchronized(dataset) {
for(BlockPoolSlice s : bpSlices.values()) { for(BlockPoolSlice s : bpSlices.values()) {

View File

@ -24,10 +24,12 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -218,16 +220,15 @@ class FsVolumeList {
} }
/** /**
* Calls {@link FsVolumeImpl#checkDirs()} on each volume, removing any * Calls {@link FsVolumeImpl#checkDirs()} on each volume.
* volumes from the active list that result in a DiskErrorException.
* *
* Use checkDirsMutext to allow only one instance of checkDirs() call * Use checkDirsMutext to allow only one instance of checkDirs() call
* *
* @return list of all the removed volumes. * @return list of all the failed volumes.
*/ */
List<FsVolumeImpl> checkDirs() { Set<File> checkDirs() {
synchronized(checkDirsMutex) { synchronized(checkDirsMutex) {
ArrayList<FsVolumeImpl> removedVols = null; Set<File> failedVols = null;
// Make a copy of volumes for performing modification // Make a copy of volumes for performing modification
final List<FsVolumeImpl> volumeList = getVolumes(); final List<FsVolumeImpl> volumeList = getVolumes();
@ -238,12 +239,12 @@ class FsVolumeList {
fsv.checkDirs(); fsv.checkDirs();
} catch (DiskErrorException e) { } catch (DiskErrorException e) {
FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e); FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e);
if (removedVols == null) { if (failedVols == null) {
removedVols = new ArrayList<>(1); failedVols = new HashSet<>(1);
} }
removedVols.add(fsv); failedVols.add(new File(fsv.getBasePath()).getAbsoluteFile());
removeVolume(fsv);
addVolumeFailureInfo(fsv); addVolumeFailureInfo(fsv);
removeVolume(fsv);
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
FsDatasetImpl.LOG.debug("Caught exception when obtaining " + FsDatasetImpl.LOG.debug("Caught exception when obtaining " +
"reference count on closed volume", e); "reference count on closed volume", e);
@ -252,12 +253,12 @@ class FsVolumeList {
} }
} }
if (removedVols != null && removedVols.size() > 0) { if (failedVols != null && failedVols.size() > 0) {
FsDatasetImpl.LOG.warn("Completed checkDirs. Removed " + removedVols.size() FsDatasetImpl.LOG.warn("Completed checkDirs. Found " + failedVols.size()
+ " volumes. Current volumes: " + this); + " failure volumes.");
} }
return removedVols; return failedVols;
} }
} }
@ -290,6 +291,9 @@ class FsVolumeList {
if (blockScanner != null) { if (blockScanner != null) {
blockScanner.addVolumeScanner(ref); blockScanner.addVolumeScanner(ref);
} }
// If the volume is used to replace a failed volume, it needs to reset the
// volume failure info for this volume.
removeVolumeFailureInfo(new File(ref.getVolume().getBasePath()));
FsDatasetImpl.LOG.info("Added new volume: " + FsDatasetImpl.LOG.info("Added new volume: " +
ref.getVolume().getStorageID()); ref.getVolume().getStorageID());
} }
@ -337,8 +341,9 @@ class FsVolumeList {
/** /**
* Dynamically remove volume in the list. * Dynamically remove volume in the list.
* @param volume the volume to be removed. * @param volume the volume to be removed.
* @param clearFailure set true to remove failure info for this volume.
*/ */
void removeVolume(File volume) { void removeVolume(File volume, boolean clearFailure) {
// Make a copy of volumes to remove one volume. // Make a copy of volumes to remove one volume.
final FsVolumeImpl[] curVolumes = volumes.get(); final FsVolumeImpl[] curVolumes = volumes.get();
final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes); final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
@ -352,8 +357,10 @@ class FsVolumeList {
removeVolume(fsVolume); removeVolume(fsVolume);
} }
} }
if (clearFailure) {
removeVolumeFailureInfo(volume); removeVolumeFailureInfo(volume);
} }
}
VolumeFailureInfo[] getVolumeFailureInfos() { VolumeFailureInfo[] getVolumeFailureInfos() {
Collection<VolumeFailureInfo> infos = volumeFailureInfos.values(); Collection<VolumeFailureInfo> infos = volumeFailureInfos.values();
@ -366,7 +373,9 @@ class FsVolumeList {
} }
private void addVolumeFailureInfo(FsVolumeImpl vol) { private void addVolumeFailureInfo(FsVolumeImpl vol) {
addVolumeFailureInfo(new VolumeFailureInfo(vol.getBasePath(), Time.now(), addVolumeFailureInfo(new VolumeFailureInfo(
new File(vol.getBasePath()).getAbsolutePath(),
Time.now(),
vol.getCapacity())); vol.getCapacity()));
} }

View File

@ -30,6 +30,7 @@ import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import javax.management.NotCompliantMBeanException; import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName; import javax.management.ObjectName;
@ -959,8 +960,9 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} }
@Override @Override
public void checkDataDir() throws DiskErrorException { public Set<File> checkDataDir() {
// nothing to check for simulated data set // nothing to check for simulated data set
return null;
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
@ -1281,7 +1283,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} }
@Override @Override
public synchronized void removeVolumes(Collection<StorageLocation> volumes) { public synchronized void removeVolumes(Set<File> volumes, boolean clearFailure) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockMissingException; import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After; import org.junit.After;
@ -95,6 +97,8 @@ public class TestDataNodeHotSwapVolumes {
conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000); conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
1000); 1000);
/* Allow 1 volume failure */
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
MiniDFSNNTopology nnTopology = MiniDFSNNTopology nnTopology =
MiniDFSNNTopology.simpleFederatedTopology(numNameNodes); MiniDFSNNTopology.simpleFederatedTopology(numNameNodes);
@ -646,4 +650,65 @@ public class TestDataNodeHotSwapVolumes {
// this directory were removed from the previous step. // this directory were removed from the previous step.
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, oldDataDir); dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, oldDataDir);
} }
/** Get the FsVolume on the given basePath */
private FsVolumeImpl getVolume(DataNode dn, File basePath) {
for (FsVolumeSpi vol : dn.getFSDataset().getVolumes()) {
if (vol.getBasePath().equals(basePath.getPath())) {
return (FsVolumeImpl)vol;
}
}
return null;
}
/**
* Verify that {@link DataNode#checkDiskErrors()} removes all metadata in
* DataNode upon a volume failure. Thus we can run reconfig on the same
* configuration to reload the new volume on the same directory as the failed one.
*/
@Test(timeout=60000)
public void testDirectlyReloadAfterCheckDiskError()
throws IOException, TimeoutException, InterruptedException,
ReconfigurationException {
startDFSCluster(1, 2);
createFile(new Path("/test"), 32, (short)2);
DataNode dn = cluster.getDataNodes().get(0);
final String oldDataDir = dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY);
File dirToFail = new File(cluster.getDataDirectory(), "data1");
FsVolumeImpl failedVolume = getVolume(dn, dirToFail);
assertTrue("No FsVolume was found for " + dirToFail,
failedVolume != null);
long used = failedVolume.getDfsUsed();
try {
assertTrue("Couldn't chmod local vol: " + dirToFail,
FileUtil.setExecutable(dirToFail, false));
// Call and wait DataNode to detect disk failure.
long lastDiskErrorCheck = dn.getLastDiskErrorCheck();
dn.checkDiskErrorAsync();
while (dn.getLastDiskErrorCheck() == lastDiskErrorCheck) {
Thread.sleep(100);
}
createFile(new Path("/test1"), 32, (short)2);
assertEquals(used, failedVolume.getDfsUsed());
} finally {
// Need to restore the mode on dirToFail. Otherwise, if an Exception
// is thrown above, the following tests can not delete this data directory
// and thus fail to start MiniDFSCluster.
assertTrue("Couldn't restore executable for: " + dirToFail,
FileUtil.setExecutable(dirToFail, true));
}
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, oldDataDir);
createFile(new Path("/test2"), 32, (short)2);
FsVolumeImpl restoredVolume = getVolume(dn, dirToFail);
assertTrue(restoredVolume != null);
assertTrue(restoredVolume != failedVolume);
// More data has been written to this volume.
assertTrue(restoredVolume.getDfsUsed() > used);
}
} }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue; import static org.junit.Assume.assumeTrue;
@ -31,6 +33,7 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -57,6 +60,10 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -200,6 +207,69 @@ public class TestDataNodeVolumeFailure {
" is created and replicated"); " is created and replicated");
} }
/**
* Test that DataStorage and BlockPoolSliceStorage remove the failed volume
* after failure.
*/
@Test(timeout=150000)
public void testFailedVolumeBeingRemovedFromDataNode()
throws InterruptedException, IOException, TimeoutException {
Path file1 = new Path("/test1");
DFSTestUtil.createFile(fs, file1, 1024, (short) 2, 1L);
DFSTestUtil.waitReplication(fs, file1, (short) 2);
File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1));
assertTrue(FileUtil.setExecutable(dn0Vol1, false));
DataNode dn0 = cluster.getDataNodes().get(0);
long lastDiskErrorCheck = dn0.getLastDiskErrorCheck();
dn0.checkDiskErrorAsync();
// Wait checkDiskError thread finish to discover volume failure.
while (dn0.getLastDiskErrorCheck() == lastDiskErrorCheck) {
Thread.sleep(100);
}
// Verify dn0Vol1 has been completely removed from DN0.
// 1. dn0Vol1 is removed from DataStorage.
DataStorage storage = dn0.getStorage();
assertEquals(1, storage.getNumStorageDirs());
for (int i = 0; i < storage.getNumStorageDirs(); i++) {
Storage.StorageDirectory sd = storage.getStorageDir(i);
assertFalse(sd.getRoot().getAbsolutePath().startsWith(
dn0Vol1.getAbsolutePath()
));
}
final String bpid = cluster.getNamesystem().getBlockPoolId();
BlockPoolSliceStorage bpsStorage = storage.getBPStorage(bpid);
assertEquals(1, bpsStorage.getNumStorageDirs());
for (int i = 0; i < bpsStorage.getNumStorageDirs(); i++) {
Storage.StorageDirectory sd = bpsStorage.getStorageDir(i);
assertFalse(sd.getRoot().getAbsolutePath().startsWith(
dn0Vol1.getAbsolutePath()
));
}
// 2. dn0Vol1 is removed from FsDataset
FsDatasetSpi<? extends FsVolumeSpi> data = dn0.getFSDataset();
for (FsVolumeSpi volume : data.getVolumes()) {
assertNotEquals(new File(volume.getBasePath()).getAbsoluteFile(),
dn0Vol1.getAbsoluteFile());
}
// 3. all blocks on dn0Vol1 have been removed.
for (ReplicaInfo replica : FsDatasetTestUtil.getReplicas(data, bpid)) {
assertNotNull(replica.getVolume());
assertNotEquals(
new File(replica.getVolume().getBasePath()).getAbsoluteFile(),
dn0Vol1.getAbsoluteFile());
}
// 4. dn0Vol1 is not in DN0's configuration and dataDirs anymore.
String[] dataDirStrs =
dn0.getConf().get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
assertEquals(1, dataDirStrs.length);
assertFalse(dataDirStrs[0].contains(dn0Vol1.getAbsolutePath()));
}
/** /**
* Test that there are under replication blocks after vol failures * Test that there are under replication blocks after vol failures
*/ */

View File

@ -403,23 +403,6 @@ public class TestDataNodeVolumeFailureReporting {
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath()); checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath()); checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
// Reconfigure each DataNode to remove its failed volumes.
reconfigureDataNode(dns.get(0), dn1Vol2);
reconfigureDataNode(dns.get(1), dn2Vol2);
DataNodeTestUtils.triggerHeartbeat(dns.get(0));
DataNodeTestUtils.triggerHeartbeat(dns.get(1));
checkFailuresAtDataNode(dns.get(0), 1, true);
checkFailuresAtDataNode(dns.get(1), 1, true);
// NN sees reduced capacity, but no volume failures.
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 0,
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(true, 0);
checkFailuresAtNameNode(dm, dns.get(0), true);
checkFailuresAtNameNode(dm, dns.get(1), true);
// Reconfigure again to try to add back the failed volumes. // Reconfigure again to try to add back the failed volumes.
reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2); reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2); reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
@ -460,6 +443,25 @@ public class TestDataNodeVolumeFailureReporting {
checkAggregateFailuresAtNameNode(false, 2); checkAggregateFailuresAtNameNode(false, 2);
checkFailuresAtNameNode(dm, dns.get(0), false, dn1Vol1.getAbsolutePath()); checkFailuresAtNameNode(dm, dns.get(0), false, dn1Vol1.getAbsolutePath());
checkFailuresAtNameNode(dm, dns.get(1), false, dn2Vol1.getAbsolutePath()); checkFailuresAtNameNode(dm, dns.get(1), false, dn2Vol1.getAbsolutePath());
// Replace failed volume with healthy volume and run reconfigure DataNode.
// The failed volume information should be cleared.
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn1Vol1, true));
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn2Vol1, true));
reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
DataNodeTestUtils.triggerHeartbeat(dns.get(0));
DataNodeTestUtils.triggerHeartbeat(dns.get(1));
checkFailuresAtDataNode(dns.get(0), 1, true);
checkFailuresAtDataNode(dns.get(1), 1, true);
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 0,
origCapacity, WAIT_FOR_HEARTBEATS);
checkAggregateFailuresAtNameNode(true, 0);
checkFailuresAtNameNode(dm, dns.get(0), true);
checkFailuresAtNameNode(dm, dns.get(1), true);
} }
/** /**

View File

@ -61,8 +61,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
} }
@Override @Override
public void removeVolumes(Collection<StorageLocation> volumes) { public void removeVolumes(Set<File> volumes, boolean clearFailure) {
} }
@Override @Override
@ -243,8 +242,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
} }
@Override @Override
public void checkDataDir() throws DiskErrorException { public Set<File> checkDataDir() {
throw new DiskChecker.DiskErrorException(null); return null;
} }
@Override @Override

View File

@ -195,10 +195,10 @@ public class TestFsDatasetImpl {
final String[] dataDirs = final String[] dataDirs =
conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(","); conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
final String volumePathToRemove = dataDirs[0]; final String volumePathToRemove = dataDirs[0];
List<StorageLocation> volumesToRemove = new ArrayList<StorageLocation>(); Set<File> volumesToRemove = new HashSet<>();
volumesToRemove.add(StorageLocation.parse(volumePathToRemove)); volumesToRemove.add(StorageLocation.parse(volumePathToRemove).getFile());
dataset.removeVolumes(volumesToRemove); dataset.removeVolumes(volumesToRemove, true);
int expectedNumVolumes = dataDirs.length - 1; int expectedNumVolumes = dataDirs.length - 1;
assertEquals("The volume has been removed from the volumeList.", assertEquals("The volume has been removed from the volumeList.",
expectedNumVolumes, dataset.getVolumes().size()); expectedNumVolumes, dataset.getVolumes().size());
@ -206,7 +206,7 @@ public class TestFsDatasetImpl {
expectedNumVolumes, dataset.storageMap.size()); expectedNumVolumes, dataset.storageMap.size());
try { try {
dataset.asyncDiskService.execute(volumesToRemove.get(0).getFile(), dataset.asyncDiskService.execute(volumesToRemove.iterator().next(),
new Runnable() { new Runnable() {
@Override @Override
public void run() {} public void run() {}
@ -248,8 +248,9 @@ public class TestFsDatasetImpl {
when(storage.getNumStorageDirs()).thenReturn(numExistingVolumes + 1); when(storage.getNumStorageDirs()).thenReturn(numExistingVolumes + 1);
when(storage.getStorageDir(numExistingVolumes)).thenReturn(sd); when(storage.getStorageDir(numExistingVolumes)).thenReturn(sd);
List<StorageLocation> volumesToRemove = Arrays.asList(loc); Set<File> volumesToRemove = new HashSet<>();
dataset.removeVolumes(volumesToRemove); volumesToRemove.add(loc.getFile());
dataset.removeVolumes(volumesToRemove, true);
assertEquals(numExistingVolumes, dataset.getVolumes().size()); assertEquals(numExistingVolumes, dataset.getVolumes().size());
} }
@ -278,12 +279,13 @@ public class TestFsDatasetImpl {
final FsVolumeImpl newVolume = mock(FsVolumeImpl.class); final FsVolumeImpl newVolume = mock(FsVolumeImpl.class);
final FsVolumeReference newRef = mock(FsVolumeReference.class); final FsVolumeReference newRef = mock(FsVolumeReference.class);
when(newRef.getVolume()).thenReturn(newVolume); when(newRef.getVolume()).thenReturn(newVolume);
when(newVolume.getBasePath()).thenReturn("data4");
FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1); FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1);
doAnswer(new Answer() { doAnswer(new Answer() {
@Override @Override
public Object answer(InvocationOnMock invocationOnMock) public Object answer(InvocationOnMock invocationOnMock)
throws Throwable { throws Throwable {
volumeList.removeVolume(new File("data4")); volumeList.removeVolume(new File("data4"), false);
volumeList.addVolume(newRef); volumeList.addVolume(newRef);
return null; return null;
} }