HDFS-9445. Datanode may deadlock while handling a bad volume. Contributed by Walter Su.

(cherry picked from commit a48301791e)
This commit is contained in:
Kihwal Lee 2015-12-11 08:46:03 -06:00
parent 7dc558b6a7
commit a72ef921f0
3 changed files with 58 additions and 36 deletions

View File

@ -1709,6 +1709,9 @@ Release 2.7.2 - UNRELEASED
HDFS-9294. DFSClient deadlock when close file and failed to renew lease. HDFS-9294. DFSClient deadlock when close file and failed to renew lease.
(Brahma Reddy Battula via szetszwo) (Brahma Reddy Battula via szetszwo)
HDFS-9445. Datanode may deadlock while handling a bad volume.
(Wlater Su via Kihwal)
Release 2.7.1 - 2015-07-06 Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -475,18 +475,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Removes a set of volumes from FsDataset. * Removes a set of volumes from FsDataset.
* @param volumesToRemove a set of absolute root path of each volume. * @param volumesToRemove a set of absolute root path of each volume.
* @param clearFailure set true to clear failure information. * @param clearFailure set true to clear failure information.
*
* DataNode should call this function before calling
* {@link DataStorage#removeVolumes(java.util.Collection)}.
*/ */
@Override @Override
public synchronized void removeVolumes( public void removeVolumes(Set<File> volumesToRemove, boolean clearFailure) {
Set<File> volumesToRemove, boolean clearFailure) {
// Make sure that all volumes are absolute path. // Make sure that all volumes are absolute path.
for (File vol : volumesToRemove) { for (File vol : volumesToRemove) {
Preconditions.checkArgument(vol.isAbsolute(), Preconditions.checkArgument(vol.isAbsolute(),
String.format("%s is not absolute path.", vol.getPath())); String.format("%s is not absolute path.", vol.getPath()));
} }
Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
List<String> storageToRemove = new ArrayList<>();
synchronized (this) {
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
final File absRoot = sd.getRoot().getAbsoluteFile(); final File absRoot = sd.getRoot().getAbsoluteFile();
@ -497,28 +497,47 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
asyncDiskService.removeVolume(sd.getCurrentDir()); asyncDiskService.removeVolume(sd.getCurrentDir());
volumes.removeVolume(absRoot, clearFailure); volumes.removeVolume(absRoot, clearFailure);
// Removed all replica information for the blocks on the volume. Unlike // Removed all replica information for the blocks on the volume.
// updating the volumeMap in addVolume(), this operation does not scan // Unlike updating the volumeMap in addVolume(), this operation does
// disks. // not scan disks.
for (String bpid : volumeMap.getBlockPoolList()) { for (String bpid : volumeMap.getBlockPoolList()) {
List<ReplicaInfo> blocks = new ArrayList<>();
for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator(); for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
it.hasNext(); ) { it.hasNext(); ) {
ReplicaInfo block = it.next(); ReplicaInfo block = it.next();
final File absBasePath = final File absBasePath =
new File(block.getVolume().getBasePath()).getAbsoluteFile(); new File(block.getVolume().getBasePath()).getAbsoluteFile();
if (absBasePath.equals(absRoot)) { if (absBasePath.equals(absRoot)) {
invalidate(bpid, block); blocks.add(block);
it.remove(); it.remove();
} }
} }
blkToInvalidate.put(bpid, blocks);
} }
storageMap.remove(sd.getStorageUuid()); storageToRemove.add(sd.getStorageUuid());
} }
} }
setupAsyncLazyPersistThreads(); setupAsyncLazyPersistThreads();
} }
// Call this outside the lock.
for (Map.Entry<String, List<ReplicaInfo>> entry :
blkToInvalidate.entrySet()) {
String bpid = entry.getKey();
List<ReplicaInfo> blocks = entry.getValue();
for (ReplicaInfo block : blocks) {
invalidate(bpid, block);
}
}
synchronized (this) {
for(String storageUuid : storageToRemove) {
storageMap.remove(storageUuid);
}
}
}
private StorageType getStorageTypeFromLocations( private StorageType getStorageTypeFromLocations(
Collection<StorageLocation> dataLocations, File dir) { Collection<StorageLocation> dataLocations, File dir) {
for (StorageLocation dataLocation : dataLocations) { for (StorageLocation dataLocation : dataLocations) {
@ -1936,15 +1955,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
public void invalidate(String bpid, ReplicaInfo block) { public void invalidate(String bpid, ReplicaInfo block) {
// If a DFSClient has the replica in its cache of short-circuit file // If a DFSClient has the replica in its cache of short-circuit file
// descriptors (and the client is using ShortCircuitShm), invalidate it. // descriptors (and the client is using ShortCircuitShm), invalidate it.
// The short-circuit registry is null in the unit tests, because the
// datanode is mock object.
if (datanode.getShortCircuitRegistry() != null) {
datanode.getShortCircuitRegistry().processBlockInvalidation( datanode.getShortCircuitRegistry().processBlockInvalidation(
new ExtendedBlockId(block.getBlockId(), bpid)); new ExtendedBlockId(block.getBlockId(), bpid));
// If the block is cached, start uncaching it. // If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, block.getBlockId()); cacheManager.uncacheBlock(bpid, block.getBlockId());
}
datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block), datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block),
block.getStorageUuid()); block.getStorageUuid());

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler; import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
@ -147,6 +148,9 @@ public class TestFsDatasetImpl {
when(datanode.getDnConf()).thenReturn(dnConf); when(datanode.getDnConf()).thenReturn(dnConf);
final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf); final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf);
when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner); when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner);
final ShortCircuitRegistry shortCircuitRegistry =
new ShortCircuitRegistry(conf);
when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry);
createStorageDirs(storage, conf, NUM_INIT_VOLUMES); createStorageDirs(storage, conf, NUM_INIT_VOLUMES);
dataset = new FsDatasetImpl(datanode, storage, conf); dataset = new FsDatasetImpl(datanode, storage, conf);