HDFS-9445. Datanode may deadlock while handling a bad volume. Contributed by Walter Su.
(cherry picked from commit a48301791e
)
This commit is contained in:
parent
d028295414
commit
937a43dc29
|
@ -1671,6 +1671,9 @@ Release 2.7.2 - UNRELEASED
|
|||
HDFS-9294. DFSClient deadlock when close file and failed to renew lease.
|
||||
(Brahma Reddy Battula via szetszwo)
|
||||
|
||||
HDFS-9445. Datanode may deadlock while handling a bad volume.
|
||||
(Wlater Su via Kihwal)
|
||||
|
||||
Release 2.7.1 - 2015-07-06
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -475,48 +475,67 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
* Removes a set of volumes from FsDataset.
|
||||
* @param volumesToRemove a set of absolute root path of each volume.
|
||||
* @param clearFailure set true to clear failure information.
|
||||
*
|
||||
* DataNode should call this function before calling
|
||||
* {@link DataStorage#removeVolumes(java.util.Collection)}.
|
||||
*/
|
||||
@Override
|
||||
public synchronized void removeVolumes(
|
||||
Set<File> volumesToRemove, boolean clearFailure) {
|
||||
public void removeVolumes(Set<File> volumesToRemove, boolean clearFailure) {
|
||||
// Make sure that all volumes are absolute path.
|
||||
for (File vol : volumesToRemove) {
|
||||
Preconditions.checkArgument(vol.isAbsolute(),
|
||||
String.format("%s is not absolute path.", vol.getPath()));
|
||||
}
|
||||
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
|
||||
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
|
||||
final File absRoot = sd.getRoot().getAbsoluteFile();
|
||||
if (volumesToRemove.contains(absRoot)) {
|
||||
LOG.info("Removing " + absRoot + " from FsDataset.");
|
||||
|
||||
// Disable the volume from the service.
|
||||
asyncDiskService.removeVolume(sd.getCurrentDir());
|
||||
volumes.removeVolume(absRoot, clearFailure);
|
||||
Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
|
||||
List<String> storageToRemove = new ArrayList<>();
|
||||
synchronized (this) {
|
||||
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
|
||||
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
|
||||
final File absRoot = sd.getRoot().getAbsoluteFile();
|
||||
if (volumesToRemove.contains(absRoot)) {
|
||||
LOG.info("Removing " + absRoot + " from FsDataset.");
|
||||
|
||||
// Removed all replica information for the blocks on the volume. Unlike
|
||||
// updating the volumeMap in addVolume(), this operation does not scan
|
||||
// disks.
|
||||
for (String bpid : volumeMap.getBlockPoolList()) {
|
||||
for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
|
||||
it.hasNext(); ) {
|
||||
ReplicaInfo block = it.next();
|
||||
final File absBasePath =
|
||||
new File(block.getVolume().getBasePath()).getAbsoluteFile();
|
||||
if (absBasePath.equals(absRoot)) {
|
||||
invalidate(bpid, block);
|
||||
it.remove();
|
||||
// Disable the volume from the service.
|
||||
asyncDiskService.removeVolume(sd.getCurrentDir());
|
||||
volumes.removeVolume(absRoot, clearFailure);
|
||||
|
||||
// Removed all replica information for the blocks on the volume.
|
||||
// Unlike updating the volumeMap in addVolume(), this operation does
|
||||
// not scan disks.
|
||||
for (String bpid : volumeMap.getBlockPoolList()) {
|
||||
List<ReplicaInfo> blocks = new ArrayList<>();
|
||||
for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
|
||||
it.hasNext(); ) {
|
||||
ReplicaInfo block = it.next();
|
||||
final File absBasePath =
|
||||
new File(block.getVolume().getBasePath()).getAbsoluteFile();
|
||||
if (absBasePath.equals(absRoot)) {
|
||||
blocks.add(block);
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
blkToInvalidate.put(bpid, blocks);
|
||||
}
|
||||
}
|
||||
|
||||
storageMap.remove(sd.getStorageUuid());
|
||||
storageToRemove.add(sd.getStorageUuid());
|
||||
}
|
||||
}
|
||||
setupAsyncLazyPersistThreads();
|
||||
}
|
||||
|
||||
// Call this outside the lock.
|
||||
for (Map.Entry<String, List<ReplicaInfo>> entry :
|
||||
blkToInvalidate.entrySet()) {
|
||||
String bpid = entry.getKey();
|
||||
List<ReplicaInfo> blocks = entry.getValue();
|
||||
for (ReplicaInfo block : blocks) {
|
||||
invalidate(bpid, block);
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
for(String storageUuid : storageToRemove) {
|
||||
storageMap.remove(storageUuid);
|
||||
}
|
||||
}
|
||||
setupAsyncLazyPersistThreads();
|
||||
}
|
||||
|
||||
private StorageType getStorageTypeFromLocations(
|
||||
|
@ -1934,15 +1953,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
public void invalidate(String bpid, ReplicaInfo block) {
|
||||
// If a DFSClient has the replica in its cache of short-circuit file
|
||||
// descriptors (and the client is using ShortCircuitShm), invalidate it.
|
||||
// The short-circuit registry is null in the unit tests, because the
|
||||
// datanode is mock object.
|
||||
if (datanode.getShortCircuitRegistry() != null) {
|
||||
datanode.getShortCircuitRegistry().processBlockInvalidation(
|
||||
new ExtendedBlockId(block.getBlockId(), bpid));
|
||||
datanode.getShortCircuitRegistry().processBlockInvalidation(
|
||||
new ExtendedBlockId(block.getBlockId(), bpid));
|
||||
|
||||
// If the block is cached, start uncaching it.
|
||||
cacheManager.uncacheBlock(bpid, block.getBlockId());
|
||||
}
|
||||
// If the block is cached, start uncaching it.
|
||||
cacheManager.uncacheBlock(bpid, block.getBlockId());
|
||||
|
||||
datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block),
|
||||
block.getStorageUuid());
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
|||
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
|
||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||
|
@ -147,6 +148,9 @@ public class TestFsDatasetImpl {
|
|||
when(datanode.getDnConf()).thenReturn(dnConf);
|
||||
final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf);
|
||||
when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner);
|
||||
final ShortCircuitRegistry shortCircuitRegistry =
|
||||
new ShortCircuitRegistry(conf);
|
||||
when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry);
|
||||
|
||||
createStorageDirs(storage, conf, NUM_INIT_VOLUMES);
|
||||
dataset = new FsDatasetImpl(datanode, storage, conf);
|
||||
|
|
Loading…
Reference in New Issue