HDFS-7722. DataNode#checkDiskError should also remove Storage when error is found. (Lei Xu via Colin P. McCabe)
(cherry picked from commit b49c3a1813
)
This commit is contained in:
parent
981a1fa815
commit
7455412a24
|
@ -859,6 +859,9 @@ Release 2.7.0 - UNRELEASED
|
|||
HDFS-7806. Refactor: move StorageType from hadoop-hdfs to
|
||||
hadoop-common. (Xiaoyu Yao via Arpit Agarwal)
|
||||
|
||||
HDFS-7722. DataNode#checkDiskError should also remove Storage when error
|
||||
is found. (Lei Xu via Colin P. McCabe)
|
||||
|
||||
Release 2.6.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -53,6 +53,7 @@ import java.io.ByteArrayInputStream;
|
|||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
@ -73,6 +74,7 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -616,20 +618,16 @@ public class DataNode extends ReconfigurableBase
|
|||
errorMessageBuilder.append(
|
||||
String.format("FAILED to ADD: %s: %s%n", volume,
|
||||
e.toString()));
|
||||
LOG.error("Failed to add volume: " + volume, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!changedVolumes.deactivateLocations.isEmpty()) {
|
||||
LOG.info("Deactivating volumes: " +
|
||||
Joiner.on(",").join(changedVolumes.deactivateLocations));
|
||||
|
||||
data.removeVolumes(changedVolumes.deactivateLocations);
|
||||
try {
|
||||
storage.removeVolumes(changedVolumes.deactivateLocations);
|
||||
removeVolumes(changedVolumes.deactivateLocations);
|
||||
} catch (IOException e) {
|
||||
errorMessageBuilder.append(e.getMessage());
|
||||
}
|
||||
LOG.error("Failed to remove volume: " + e.getMessage(), e);
|
||||
}
|
||||
|
||||
if (errorMessageBuilder.length() > 0) {
|
||||
|
@ -642,6 +640,79 @@ public class DataNode extends ReconfigurableBase
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove volumes from DataNode.
|
||||
* See {@link removeVolumes(final Set<File>, boolean)} for details.
|
||||
*
|
||||
* @param locations the StorageLocations of the volumes to be removed.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void removeVolumes(final Collection<StorageLocation> locations)
|
||||
throws IOException {
|
||||
if (locations.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
Set<File> volumesToRemove = new HashSet<>();
|
||||
for (StorageLocation loc : locations) {
|
||||
volumesToRemove.add(loc.getFile().getAbsoluteFile());
|
||||
}
|
||||
removeVolumes(volumesToRemove, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove volumes from DataNode.
|
||||
*
|
||||
* It does three things:
|
||||
* <li>
|
||||
* <ul>Remove volumes and block info from FsDataset.</ul>
|
||||
* <ul>Remove volumes from DataStorage.</ul>
|
||||
* <ul>Reset configuration DATA_DIR and {@link dataDirs} to represent
|
||||
* active volumes.</ul>
|
||||
* </li>
|
||||
* @param absoluteVolumePaths the absolute path of volumes.
|
||||
* @param clearFailure if true, clears the failure information related to the
|
||||
* volumes.
|
||||
* @throws IOException
|
||||
*/
|
||||
private synchronized void removeVolumes(
|
||||
final Set<File> absoluteVolumePaths, boolean clearFailure)
|
||||
throws IOException {
|
||||
for (File vol : absoluteVolumePaths) {
|
||||
Preconditions.checkArgument(vol.isAbsolute());
|
||||
}
|
||||
|
||||
if (absoluteVolumePaths.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.info(String.format("Deactivating volumes (clear failure=%b): %s",
|
||||
clearFailure, Joiner.on(",").join(absoluteVolumePaths)));
|
||||
|
||||
IOException ioe = null;
|
||||
// Remove volumes and block infos from FsDataset.
|
||||
data.removeVolumes(absoluteVolumePaths, clearFailure);
|
||||
|
||||
// Remove volumes from DataStorage.
|
||||
try {
|
||||
storage.removeVolumes(absoluteVolumePaths);
|
||||
} catch (IOException e) {
|
||||
ioe = e;
|
||||
}
|
||||
|
||||
// Set configuration and dataDirs to reflect volume changes.
|
||||
for (Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext(); ) {
|
||||
StorageLocation loc = it.next();
|
||||
if (absoluteVolumePaths.contains(loc.getFile().getAbsoluteFile())) {
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
conf.set(DFS_DATANODE_DATA_DIR_KEY, Joiner.on(",").join(dataDirs));
|
||||
|
||||
if (ioe != null) {
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void setClusterId(final String nsCid, final String bpid
|
||||
) throws IOException {
|
||||
if(clusterId != null && !clusterId.equals(nsCid)) {
|
||||
|
@ -3083,10 +3154,20 @@ public class DataNode extends ReconfigurableBase
|
|||
* Check the disk error
|
||||
*/
|
||||
private void checkDiskError() {
|
||||
Set<File> unhealthyDataDirs = data.checkDataDir();
|
||||
if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) {
|
||||
try {
|
||||
data.checkDataDir();
|
||||
} catch (DiskErrorException de) {
|
||||
handleDiskError(de.getMessage());
|
||||
// Remove all unhealthy volumes from DataNode.
|
||||
removeVolumes(unhealthyDataDirs, false);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Error occurred when removing unhealthy storage dirs: "
|
||||
+ e.getMessage(), e);
|
||||
}
|
||||
StringBuilder sb = new StringBuilder("DataNode failed volumes:");
|
||||
for (File dataDir : unhealthyDataDirs) {
|
||||
sb.append(dataDir.getAbsolutePath() + ";");
|
||||
}
|
||||
handleDiskError(sb.toString());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -404,28 +404,23 @@ public class DataStorage extends Storage {
|
|||
}
|
||||
|
||||
/**
|
||||
* Remove volumes from DataStorage. All volumes are removed even when the
|
||||
* Remove storage dirs from DataStorage. All storage dirs are removed even when the
|
||||
* IOException is thrown.
|
||||
*
|
||||
* @param locations a collection of volumes.
|
||||
* @param dirsToRemove a set of storage directories to be removed.
|
||||
* @throws IOException if I/O error when unlocking storage directory.
|
||||
*/
|
||||
synchronized void removeVolumes(Collection<StorageLocation> locations)
|
||||
synchronized void removeVolumes(final Set<File> dirsToRemove)
|
||||
throws IOException {
|
||||
if (locations.isEmpty()) {
|
||||
if (dirsToRemove.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
Set<File> dataDirs = new HashSet<File>();
|
||||
for (StorageLocation sl : locations) {
|
||||
dataDirs.add(sl.getFile());
|
||||
}
|
||||
|
||||
StringBuilder errorMsgBuilder = new StringBuilder();
|
||||
for (Iterator<StorageDirectory> it = this.storageDirs.iterator();
|
||||
it.hasNext(); ) {
|
||||
StorageDirectory sd = it.next();
|
||||
if (dataDirs.contains(sd.getRoot())) {
|
||||
if (dirsToRemove.contains(sd.getRoot())) {
|
||||
// Remove the block pool level storage first.
|
||||
for (Map.Entry<String, BlockPoolSliceStorage> entry :
|
||||
this.bpStorageMap.entrySet()) {
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.io.InputStream;
|
|||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -113,9 +114,11 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
* If the FSDataset supports block scanning, this function removes
|
||||
* the volumes from the block scanner.
|
||||
*
|
||||
* @param volumes The storage locations of the volumes to remove.
|
||||
* @param volumes The paths of the volumes to be removed.
|
||||
* @param clearFailure set true to clear the failure information about the
|
||||
* volumes.
|
||||
*/
|
||||
public void removeVolumes(Collection<StorageLocation> volumes);
|
||||
public void removeVolumes(Set<File> volumes, boolean clearFailure);
|
||||
|
||||
/** @return a storage with the given storage ID */
|
||||
public DatanodeStorage getStorage(final String storageUuid);
|
||||
|
@ -388,9 +391,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
|
||||
/**
|
||||
* Check if all the data directories are healthy
|
||||
* @throws DiskErrorException
|
||||
* @return A set of unhealthy data directories.
|
||||
*/
|
||||
public void checkDataDir() throws DiskErrorException;
|
||||
public Set<File> checkDataDir();
|
||||
|
||||
/**
|
||||
* Shutdown the FSDataset
|
||||
|
|
|
@ -447,41 +447,42 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
/**
|
||||
* Removes a collection of volumes from FsDataset.
|
||||
* @param volumes the root directories of the volumes.
|
||||
* Removes a set of volumes from FsDataset.
|
||||
* @param volumesToRemove a set of absolute root path of each volume.
|
||||
* @param clearFailure set true to clear failure information.
|
||||
*
|
||||
* DataNode should call this function before calling
|
||||
* {@link DataStorage#removeVolumes(java.util.Collection)}.
|
||||
*/
|
||||
@Override
|
||||
public synchronized void removeVolumes(Collection<StorageLocation> volumes) {
|
||||
Set<String> volumeSet = new HashSet<>();
|
||||
for (StorageLocation sl : volumes) {
|
||||
volumeSet.add(sl.getFile().getAbsolutePath());
|
||||
public synchronized void removeVolumes(
|
||||
Set<File> volumesToRemove, boolean clearFailure) {
|
||||
// Make sure that all volumes are absolute path.
|
||||
for (File vol : volumesToRemove) {
|
||||
Preconditions.checkArgument(vol.isAbsolute(),
|
||||
String.format("%s is not absolute path.", vol.getPath()));
|
||||
}
|
||||
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
|
||||
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
|
||||
String volume = sd.getRoot().getAbsolutePath();
|
||||
if (volumeSet.contains(volume)) {
|
||||
LOG.info("Removing " + volume + " from FsDataset.");
|
||||
final File absRoot = sd.getRoot().getAbsoluteFile();
|
||||
if (volumesToRemove.contains(absRoot)) {
|
||||
LOG.info("Removing " + absRoot + " from FsDataset.");
|
||||
|
||||
// Disable the volume from the service.
|
||||
asyncDiskService.removeVolume(sd.getCurrentDir());
|
||||
this.volumes.removeVolume(sd.getRoot());
|
||||
volumes.removeVolume(absRoot, clearFailure);
|
||||
|
||||
// Removed all replica information for the blocks on the volume. Unlike
|
||||
// updating the volumeMap in addVolume(), this operation does not scan
|
||||
// disks.
|
||||
for (String bpid : volumeMap.getBlockPoolList()) {
|
||||
List<Block> blocks = new ArrayList<Block>();
|
||||
for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
|
||||
it.hasNext(); ) {
|
||||
ReplicaInfo block = it.next();
|
||||
String absBasePath =
|
||||
new File(block.getVolume().getBasePath()).getAbsolutePath();
|
||||
if (absBasePath.equals(volume)) {
|
||||
final File absBasePath =
|
||||
new File(block.getVolume().getBasePath()).getAbsoluteFile();
|
||||
if (absBasePath.equals(absRoot)) {
|
||||
invalidate(bpid, block);
|
||||
blocks.add(block);
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
|
@ -1977,50 +1978,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
/**
|
||||
* check if a data directory is healthy
|
||||
* if some volumes failed - make sure to remove all the blocks that belong
|
||||
* to these volumes
|
||||
* @throws DiskErrorException
|
||||
*
|
||||
* if some volumes failed - the caller must emove all the blocks that belong
|
||||
* to these failed volumes.
|
||||
* @return the failed volumes. Returns null if no volume failed.
|
||||
*/
|
||||
@Override // FsDatasetSpi
|
||||
public void checkDataDir() throws DiskErrorException {
|
||||
long totalBlocks=0, removedBlocks=0;
|
||||
List<FsVolumeImpl> failedVols = volumes.checkDirs();
|
||||
|
||||
// If there no failed volumes return
|
||||
if (failedVols == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Otherwise remove blocks for the failed volumes
|
||||
long mlsec = Time.now();
|
||||
synchronized (this) {
|
||||
for (FsVolumeImpl fv: failedVols) {
|
||||
for (String bpid : fv.getBlockPoolList()) {
|
||||
Iterator<ReplicaInfo> ib = volumeMap.replicas(bpid).iterator();
|
||||
while(ib.hasNext()) {
|
||||
ReplicaInfo b = ib.next();
|
||||
totalBlocks++;
|
||||
// check if the volume block belongs to still valid
|
||||
if(b.getVolume() == fv) {
|
||||
LOG.warn("Removing replica " + bpid + ":" + b.getBlockId()
|
||||
+ " on failed volume " + fv.getCurrentDir().getAbsolutePath());
|
||||
ib.remove();
|
||||
removedBlocks++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} // end of sync
|
||||
mlsec = Time.now() - mlsec;
|
||||
LOG.warn("Removed " + removedBlocks + " out of " + totalBlocks +
|
||||
"(took " + mlsec + " millisecs)");
|
||||
|
||||
// report the error
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (FsVolumeImpl fv : failedVols) {
|
||||
sb.append(fv.getCurrentDir().getAbsolutePath() + ";");
|
||||
}
|
||||
throw new DiskErrorException("DataNode failed volumes:" + sb);
|
||||
public Set<File> checkDataDir() {
|
||||
return volumes.checkDirs();
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -289,7 +289,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
}
|
||||
}
|
||||
|
||||
long getDfsUsed() throws IOException {
|
||||
@VisibleForTesting
|
||||
public long getDfsUsed() throws IOException {
|
||||
long dfsUsed = 0;
|
||||
synchronized(dataset) {
|
||||
for(BlockPoolSlice s : bpSlices.values()) {
|
||||
|
|
|
@ -24,10 +24,12 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -218,16 +220,15 @@ class FsVolumeList {
|
|||
}
|
||||
|
||||
/**
|
||||
* Calls {@link FsVolumeImpl#checkDirs()} on each volume, removing any
|
||||
* volumes from the active list that result in a DiskErrorException.
|
||||
* Calls {@link FsVolumeImpl#checkDirs()} on each volume.
|
||||
*
|
||||
* Use checkDirsMutext to allow only one instance of checkDirs() call
|
||||
*
|
||||
* @return list of all the removed volumes.
|
||||
* @return list of all the failed volumes.
|
||||
*/
|
||||
List<FsVolumeImpl> checkDirs() {
|
||||
Set<File> checkDirs() {
|
||||
synchronized(checkDirsMutex) {
|
||||
ArrayList<FsVolumeImpl> removedVols = null;
|
||||
Set<File> failedVols = null;
|
||||
|
||||
// Make a copy of volumes for performing modification
|
||||
final List<FsVolumeImpl> volumeList = getVolumes();
|
||||
|
@ -238,12 +239,12 @@ class FsVolumeList {
|
|||
fsv.checkDirs();
|
||||
} catch (DiskErrorException e) {
|
||||
FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e);
|
||||
if (removedVols == null) {
|
||||
removedVols = new ArrayList<>(1);
|
||||
if (failedVols == null) {
|
||||
failedVols = new HashSet<>(1);
|
||||
}
|
||||
removedVols.add(fsv);
|
||||
removeVolume(fsv);
|
||||
failedVols.add(new File(fsv.getBasePath()).getAbsoluteFile());
|
||||
addVolumeFailureInfo(fsv);
|
||||
removeVolume(fsv);
|
||||
} catch (ClosedChannelException e) {
|
||||
FsDatasetImpl.LOG.debug("Caught exception when obtaining " +
|
||||
"reference count on closed volume", e);
|
||||
|
@ -252,12 +253,12 @@ class FsVolumeList {
|
|||
}
|
||||
}
|
||||
|
||||
if (removedVols != null && removedVols.size() > 0) {
|
||||
FsDatasetImpl.LOG.warn("Completed checkDirs. Removed " + removedVols.size()
|
||||
+ " volumes. Current volumes: " + this);
|
||||
if (failedVols != null && failedVols.size() > 0) {
|
||||
FsDatasetImpl.LOG.warn("Completed checkDirs. Found " + failedVols.size()
|
||||
+ " failure volumes.");
|
||||
}
|
||||
|
||||
return removedVols;
|
||||
return failedVols;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -290,6 +291,9 @@ class FsVolumeList {
|
|||
if (blockScanner != null) {
|
||||
blockScanner.addVolumeScanner(ref);
|
||||
}
|
||||
// If the volume is used to replace a failed volume, it needs to reset the
|
||||
// volume failure info for this volume.
|
||||
removeVolumeFailureInfo(new File(ref.getVolume().getBasePath()));
|
||||
FsDatasetImpl.LOG.info("Added new volume: " +
|
||||
ref.getVolume().getStorageID());
|
||||
}
|
||||
|
@ -337,8 +341,9 @@ class FsVolumeList {
|
|||
/**
|
||||
* Dynamically remove volume in the list.
|
||||
* @param volume the volume to be removed.
|
||||
* @param clearFailure set true to remove failure info for this volume.
|
||||
*/
|
||||
void removeVolume(File volume) {
|
||||
void removeVolume(File volume, boolean clearFailure) {
|
||||
// Make a copy of volumes to remove one volume.
|
||||
final FsVolumeImpl[] curVolumes = volumes.get();
|
||||
final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
|
||||
|
@ -352,8 +357,10 @@ class FsVolumeList {
|
|||
removeVolume(fsVolume);
|
||||
}
|
||||
}
|
||||
if (clearFailure) {
|
||||
removeVolumeFailureInfo(volume);
|
||||
}
|
||||
}
|
||||
|
||||
VolumeFailureInfo[] getVolumeFailureInfos() {
|
||||
Collection<VolumeFailureInfo> infos = volumeFailureInfos.values();
|
||||
|
@ -366,7 +373,9 @@ class FsVolumeList {
|
|||
}
|
||||
|
||||
private void addVolumeFailureInfo(FsVolumeImpl vol) {
|
||||
addVolumeFailureInfo(new VolumeFailureInfo(vol.getBasePath(), Time.now(),
|
||||
addVolumeFailureInfo(new VolumeFailureInfo(
|
||||
new File(vol.getBasePath()).getAbsolutePath(),
|
||||
Time.now(),
|
||||
vol.getCapacity()));
|
||||
}
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.HashMap;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.management.NotCompliantMBeanException;
|
||||
import javax.management.ObjectName;
|
||||
|
@ -959,8 +960,9 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void checkDataDir() throws DiskErrorException {
|
||||
public Set<File> checkDataDir() {
|
||||
// nothing to check for simulated data set
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
|
@ -1281,7 +1283,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void removeVolumes(Collection<StorageLocation> volumes) {
|
||||
public synchronized void removeVolumes(Set<File> volumes, boolean clearFailure) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.fs.BlockLocation;
|
|||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.BlockMissingException;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
|
@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.server.common.Storage;
|
|||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.After;
|
||||
|
@ -95,6 +97,8 @@ public class TestDataNodeHotSwapVolumes {
|
|||
conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
||||
1000);
|
||||
/* Allow 1 volume failure */
|
||||
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
|
||||
|
||||
MiniDFSNNTopology nnTopology =
|
||||
MiniDFSNNTopology.simpleFederatedTopology(numNameNodes);
|
||||
|
@ -646,4 +650,65 @@ public class TestDataNodeHotSwapVolumes {
|
|||
// this directory were removed from the previous step.
|
||||
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, oldDataDir);
|
||||
}
|
||||
|
||||
/** Get the FsVolume on the given basePath */
|
||||
private FsVolumeImpl getVolume(DataNode dn, File basePath) {
|
||||
for (FsVolumeSpi vol : dn.getFSDataset().getVolumes()) {
|
||||
if (vol.getBasePath().equals(basePath.getPath())) {
|
||||
return (FsVolumeImpl)vol;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that {@link DataNode#checkDiskErrors()} removes all metadata in
|
||||
* DataNode upon a volume failure. Thus we can run reconfig on the same
|
||||
* configuration to reload the new volume on the same directory as the failed one.
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
public void testDirectlyReloadAfterCheckDiskError()
|
||||
throws IOException, TimeoutException, InterruptedException,
|
||||
ReconfigurationException {
|
||||
startDFSCluster(1, 2);
|
||||
createFile(new Path("/test"), 32, (short)2);
|
||||
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
final String oldDataDir = dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY);
|
||||
File dirToFail = new File(cluster.getDataDirectory(), "data1");
|
||||
|
||||
FsVolumeImpl failedVolume = getVolume(dn, dirToFail);
|
||||
assertTrue("No FsVolume was found for " + dirToFail,
|
||||
failedVolume != null);
|
||||
long used = failedVolume.getDfsUsed();
|
||||
|
||||
try {
|
||||
assertTrue("Couldn't chmod local vol: " + dirToFail,
|
||||
FileUtil.setExecutable(dirToFail, false));
|
||||
// Call and wait DataNode to detect disk failure.
|
||||
long lastDiskErrorCheck = dn.getLastDiskErrorCheck();
|
||||
dn.checkDiskErrorAsync();
|
||||
while (dn.getLastDiskErrorCheck() == lastDiskErrorCheck) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
createFile(new Path("/test1"), 32, (short)2);
|
||||
assertEquals(used, failedVolume.getDfsUsed());
|
||||
} finally {
|
||||
// Need to restore the mode on dirToFail. Otherwise, if an Exception
|
||||
// is thrown above, the following tests can not delete this data directory
|
||||
// and thus fail to start MiniDFSCluster.
|
||||
assertTrue("Couldn't restore executable for: " + dirToFail,
|
||||
FileUtil.setExecutable(dirToFail, true));
|
||||
}
|
||||
|
||||
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, oldDataDir);
|
||||
|
||||
createFile(new Path("/test2"), 32, (short)2);
|
||||
FsVolumeImpl restoredVolume = getVolume(dn, dirToFail);
|
||||
assertTrue(restoredVolume != null);
|
||||
assertTrue(restoredVolume != failedVolume);
|
||||
// More data has been written to this volume.
|
||||
assertTrue(restoredVolume.getDfsUsed() > used);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
|
@ -31,6 +33,7 @@ import java.util.ArrayList;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -57,6 +60,10 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
|||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
|
@ -200,6 +207,69 @@ public class TestDataNodeVolumeFailure {
|
|||
" is created and replicated");
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that DataStorage and BlockPoolSliceStorage remove the failed volume
|
||||
* after failure.
|
||||
*/
|
||||
@Test(timeout=150000)
|
||||
public void testFailedVolumeBeingRemovedFromDataNode()
|
||||
throws InterruptedException, IOException, TimeoutException {
|
||||
Path file1 = new Path("/test1");
|
||||
DFSTestUtil.createFile(fs, file1, 1024, (short) 2, 1L);
|
||||
DFSTestUtil.waitReplication(fs, file1, (short) 2);
|
||||
|
||||
File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1));
|
||||
assertTrue(FileUtil.setExecutable(dn0Vol1, false));
|
||||
DataNode dn0 = cluster.getDataNodes().get(0);
|
||||
long lastDiskErrorCheck = dn0.getLastDiskErrorCheck();
|
||||
dn0.checkDiskErrorAsync();
|
||||
// Wait checkDiskError thread finish to discover volume failure.
|
||||
while (dn0.getLastDiskErrorCheck() == lastDiskErrorCheck) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
// Verify dn0Vol1 has been completely removed from DN0.
|
||||
// 1. dn0Vol1 is removed from DataStorage.
|
||||
DataStorage storage = dn0.getStorage();
|
||||
assertEquals(1, storage.getNumStorageDirs());
|
||||
for (int i = 0; i < storage.getNumStorageDirs(); i++) {
|
||||
Storage.StorageDirectory sd = storage.getStorageDir(i);
|
||||
assertFalse(sd.getRoot().getAbsolutePath().startsWith(
|
||||
dn0Vol1.getAbsolutePath()
|
||||
));
|
||||
}
|
||||
final String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
BlockPoolSliceStorage bpsStorage = storage.getBPStorage(bpid);
|
||||
assertEquals(1, bpsStorage.getNumStorageDirs());
|
||||
for (int i = 0; i < bpsStorage.getNumStorageDirs(); i++) {
|
||||
Storage.StorageDirectory sd = bpsStorage.getStorageDir(i);
|
||||
assertFalse(sd.getRoot().getAbsolutePath().startsWith(
|
||||
dn0Vol1.getAbsolutePath()
|
||||
));
|
||||
}
|
||||
|
||||
// 2. dn0Vol1 is removed from FsDataset
|
||||
FsDatasetSpi<? extends FsVolumeSpi> data = dn0.getFSDataset();
|
||||
for (FsVolumeSpi volume : data.getVolumes()) {
|
||||
assertNotEquals(new File(volume.getBasePath()).getAbsoluteFile(),
|
||||
dn0Vol1.getAbsoluteFile());
|
||||
}
|
||||
|
||||
// 3. all blocks on dn0Vol1 have been removed.
|
||||
for (ReplicaInfo replica : FsDatasetTestUtil.getReplicas(data, bpid)) {
|
||||
assertNotNull(replica.getVolume());
|
||||
assertNotEquals(
|
||||
new File(replica.getVolume().getBasePath()).getAbsoluteFile(),
|
||||
dn0Vol1.getAbsoluteFile());
|
||||
}
|
||||
|
||||
// 4. dn0Vol1 is not in DN0's configuration and dataDirs anymore.
|
||||
String[] dataDirStrs =
|
||||
dn0.getConf().get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
|
||||
assertEquals(1, dataDirStrs.length);
|
||||
assertFalse(dataDirStrs[0].contains(dn0Vol1.getAbsolutePath()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that there are under replication blocks after vol failures
|
||||
*/
|
||||
|
|
|
@ -403,23 +403,6 @@ public class TestDataNodeVolumeFailureReporting {
|
|||
checkFailuresAtNameNode(dm, dns.get(0), true, dn1Vol1.getAbsolutePath());
|
||||
checkFailuresAtNameNode(dm, dns.get(1), true, dn2Vol1.getAbsolutePath());
|
||||
|
||||
// Reconfigure each DataNode to remove its failed volumes.
|
||||
reconfigureDataNode(dns.get(0), dn1Vol2);
|
||||
reconfigureDataNode(dns.get(1), dn2Vol2);
|
||||
|
||||
DataNodeTestUtils.triggerHeartbeat(dns.get(0));
|
||||
DataNodeTestUtils.triggerHeartbeat(dns.get(1));
|
||||
|
||||
checkFailuresAtDataNode(dns.get(0), 1, true);
|
||||
checkFailuresAtDataNode(dns.get(1), 1, true);
|
||||
|
||||
// NN sees reduced capacity, but no volume failures.
|
||||
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 0,
|
||||
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
|
||||
checkAggregateFailuresAtNameNode(true, 0);
|
||||
checkFailuresAtNameNode(dm, dns.get(0), true);
|
||||
checkFailuresAtNameNode(dm, dns.get(1), true);
|
||||
|
||||
// Reconfigure again to try to add back the failed volumes.
|
||||
reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
|
||||
reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
|
||||
|
@ -460,6 +443,25 @@ public class TestDataNodeVolumeFailureReporting {
|
|||
checkAggregateFailuresAtNameNode(false, 2);
|
||||
checkFailuresAtNameNode(dm, dns.get(0), false, dn1Vol1.getAbsolutePath());
|
||||
checkFailuresAtNameNode(dm, dns.get(1), false, dn2Vol1.getAbsolutePath());
|
||||
|
||||
// Replace failed volume with healthy volume and run reconfigure DataNode.
|
||||
// The failed volume information should be cleared.
|
||||
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn1Vol1, true));
|
||||
assertTrue("Couldn't chmod local vol", FileUtil.setExecutable(dn2Vol1, true));
|
||||
reconfigureDataNode(dns.get(0), dn1Vol1, dn1Vol2);
|
||||
reconfigureDataNode(dns.get(1), dn2Vol1, dn2Vol2);
|
||||
|
||||
DataNodeTestUtils.triggerHeartbeat(dns.get(0));
|
||||
DataNodeTestUtils.triggerHeartbeat(dns.get(1));
|
||||
|
||||
checkFailuresAtDataNode(dns.get(0), 1, true);
|
||||
checkFailuresAtDataNode(dns.get(1), 1, true);
|
||||
|
||||
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 0,
|
||||
origCapacity, WAIT_FOR_HEARTBEATS);
|
||||
checkAggregateFailuresAtNameNode(true, 0);
|
||||
checkFailuresAtNameNode(dm, dns.get(0), true);
|
||||
checkFailuresAtNameNode(dm, dns.get(1), true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -61,8 +61,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void removeVolumes(Collection<StorageLocation> volumes) {
|
||||
|
||||
public void removeVolumes(Set<File> volumes, boolean clearFailure) {
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -243,8 +242,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void checkDataDir() throws DiskErrorException {
|
||||
throw new DiskChecker.DiskErrorException(null);
|
||||
public Set<File> checkDataDir() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -195,10 +195,10 @@ public class TestFsDatasetImpl {
|
|||
final String[] dataDirs =
|
||||
conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
|
||||
final String volumePathToRemove = dataDirs[0];
|
||||
List<StorageLocation> volumesToRemove = new ArrayList<StorageLocation>();
|
||||
volumesToRemove.add(StorageLocation.parse(volumePathToRemove));
|
||||
Set<File> volumesToRemove = new HashSet<>();
|
||||
volumesToRemove.add(StorageLocation.parse(volumePathToRemove).getFile());
|
||||
|
||||
dataset.removeVolumes(volumesToRemove);
|
||||
dataset.removeVolumes(volumesToRemove, true);
|
||||
int expectedNumVolumes = dataDirs.length - 1;
|
||||
assertEquals("The volume has been removed from the volumeList.",
|
||||
expectedNumVolumes, dataset.getVolumes().size());
|
||||
|
@ -206,7 +206,7 @@ public class TestFsDatasetImpl {
|
|||
expectedNumVolumes, dataset.storageMap.size());
|
||||
|
||||
try {
|
||||
dataset.asyncDiskService.execute(volumesToRemove.get(0).getFile(),
|
||||
dataset.asyncDiskService.execute(volumesToRemove.iterator().next(),
|
||||
new Runnable() {
|
||||
@Override
|
||||
public void run() {}
|
||||
|
@ -248,8 +248,9 @@ public class TestFsDatasetImpl {
|
|||
|
||||
when(storage.getNumStorageDirs()).thenReturn(numExistingVolumes + 1);
|
||||
when(storage.getStorageDir(numExistingVolumes)).thenReturn(sd);
|
||||
List<StorageLocation> volumesToRemove = Arrays.asList(loc);
|
||||
dataset.removeVolumes(volumesToRemove);
|
||||
Set<File> volumesToRemove = new HashSet<>();
|
||||
volumesToRemove.add(loc.getFile());
|
||||
dataset.removeVolumes(volumesToRemove, true);
|
||||
assertEquals(numExistingVolumes, dataset.getVolumes().size());
|
||||
}
|
||||
|
||||
|
@ -278,12 +279,13 @@ public class TestFsDatasetImpl {
|
|||
final FsVolumeImpl newVolume = mock(FsVolumeImpl.class);
|
||||
final FsVolumeReference newRef = mock(FsVolumeReference.class);
|
||||
when(newRef.getVolume()).thenReturn(newVolume);
|
||||
when(newVolume.getBasePath()).thenReturn("data4");
|
||||
FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1);
|
||||
doAnswer(new Answer() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocationOnMock)
|
||||
throws Throwable {
|
||||
volumeList.removeVolume(new File("data4"));
|
||||
volumeList.removeVolume(new File("data4"), false);
|
||||
volumeList.addVolume(newRef);
|
||||
return null;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue