diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 957b3066d8b..52c1b9d0506 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -557,7 +557,8 @@ class FsDatasetImpl implements FsDatasetSpi { // Unlike updating the volumeMap in addVolume(), this operation does // not scan disks. for (String bpid : volumeMap.getBlockPoolList()) { - List blocks = new ArrayList<>(); + List blocks = blkToInvalidate + .computeIfAbsent(bpid, (k) -> new ArrayList<>()); for (Iterator it = volumeMap.replicas(bpid).iterator(); it.hasNext();) { ReplicaInfo block = it.next(); @@ -570,9 +571,7 @@ class FsDatasetImpl implements FsDatasetSpi { it.remove(); } } - blkToInvalidate.put(bpid, blocks); } - storageToRemove.add(sd.getStorageUuid()); storageLocationsToRemove.remove(sdLocation); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 51a6e6e795c..bb92dedf051 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import com.google.common.base.Supplier; import com.google.common.collect.Lists; +import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Paths; import org.apache.commons.io.FileUtils; @@ -105,6 +106,8 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.slf4j.Logger; @@ -264,16 +267,24 @@ public class TestFsDatasetImpl { } @Test(timeout = 30000) - public void testRemoveVolumes() throws IOException { + public void testRemoveOneVolume() throws IOException { // Feed FsDataset with block metadata. - final int NUM_BLOCKS = 100; - for (int i = 0; i < NUM_BLOCKS; i++) { - String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length]; + final int numBlocks = 100; + for (int i = 0; i < numBlocks; i++) { + String bpid = BLOCK_POOL_IDS[numBlocks % BLOCK_POOL_IDS.length]; ExtendedBlock eb = new ExtendedBlock(bpid, i); - try (ReplicaHandler replica = - dataset.createRbw(StorageType.DEFAULT, null, eb, false)) { + ReplicaHandler replica = null; + try { + replica = dataset.createRbw(StorageType.DEFAULT, null, eb, + false); + } finally { + if (replica != null) { + replica.close(); + } } } + + // Remove one volume final String[] dataDirs = conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(","); final String volumePathToRemove = dataDirs[0]; @@ -296,6 +307,11 @@ public class TestFsDatasetImpl { assertEquals("The volume has been removed from the storageMap.", expectedNumVolumes, dataset.storageMap.size()); + // DataNode.notifyNamenodeDeletedBlock() should be called 50 times + // as we deleted one volume that has 50 blocks + verify(datanode, times(50)) + .notifyNamenodeDeletedBlock(any(), any()); + try { dataset.asyncDiskService.execute(volumeToRemove, new Runnable() { @@ -313,10 +329,81 @@ public class TestFsDatasetImpl { totalNumReplicas += dataset.volumeMap.size(bpid); } assertEquals("The replica infos on this volume has been removed from the " - + "volumeMap.", NUM_BLOCKS / NUM_INIT_VOLUMES, + + "volumeMap.", numBlocks / NUM_INIT_VOLUMES, totalNumReplicas); } + @Test(timeout = 30000) + public void testRemoveTwoVolumes() throws IOException { + // Feed FsDataset with block metadata. + final int numBlocks = 100; + for (int i = 0; i < numBlocks; i++) { + String bpid = BLOCK_POOL_IDS[numBlocks % BLOCK_POOL_IDS.length]; + ExtendedBlock eb = new ExtendedBlock(bpid, i); + ReplicaHandler replica = null; + try { + replica = dataset.createRbw(StorageType.DEFAULT, null, eb, + false); + } finally { + if (replica != null) { + replica.close(); + } + } + } + + // Remove two volumes + final String[] dataDirs = + conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(","); + Set volumesToRemove = new HashSet<>(); + volumesToRemove.add(StorageLocation.parse(dataDirs[0])); + volumesToRemove.add(StorageLocation.parse(dataDirs[1])); + + FsVolumeReferences volReferences = dataset.getFsVolumeReferences(); + Set volumes = new HashSet<>(); + for (FsVolumeSpi vol: volReferences) { + for (StorageLocation volume : volumesToRemove) { + if (vol.getStorageLocation().equals(volume)) { + volumes.add((FsVolumeImpl) vol); + } + } + } + assertEquals(2, volumes.size()); + volReferences.close(); + + dataset.removeVolumes(volumesToRemove, true); + int expectedNumVolumes = dataDirs.length - 2; + assertEquals("The volume has been removed from the volumeList.", + expectedNumVolumes, getNumVolumes()); + assertEquals("The volume has been removed from the storageMap.", + expectedNumVolumes, dataset.storageMap.size()); + + // DataNode.notifyNamenodeDeletedBlock() should be called 100 times + // as we deleted 2 volumes that have 100 blocks totally + verify(datanode, times(100)) + .notifyNamenodeDeletedBlock(any(), any()); + + for (FsVolumeImpl volume : volumes) { + try { + dataset.asyncDiskService.execute(volume, + new Runnable() { + @Override + public void run() {} + }); + fail("Expect RuntimeException: the volume has been removed from the " + + "AsyncDiskService."); + } catch (RuntimeException e) { + GenericTestUtils.assertExceptionContains("Cannot find volume", e); + } + } + + int totalNumReplicas = 0; + for (String bpid : dataset.volumeMap.getBlockPoolList()) { + totalNumReplicas += dataset.volumeMap.size(bpid); + } + assertEquals("The replica infos on this volume has been removed from the " + + "volumeMap.", 0, totalNumReplicas); + } + @Test(timeout = 5000) public void testRemoveNewlyAddedVolume() throws IOException { final int numExistingVolumes = getNumVolumes();