HDFS-15386. ReplicaNotFoundException keeps happening in DN after removing multiple DN's data directories (#2052)
Contributed by Toshihiro Suzuki.
(cherry picked from commit 545a0a147c
)
This commit is contained in:
parent
a266e32d82
commit
ec8f3714e0
|
@ -575,7 +575,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
// Unlike updating the volumeMap in addVolume(), this operation does
|
// Unlike updating the volumeMap in addVolume(), this operation does
|
||||||
// not scan disks.
|
// not scan disks.
|
||||||
for (String bpid : volumeMap.getBlockPoolList()) {
|
for (String bpid : volumeMap.getBlockPoolList()) {
|
||||||
List<ReplicaInfo> blocks = new ArrayList<>();
|
List<ReplicaInfo> blocks = blkToInvalidate
|
||||||
|
.computeIfAbsent(bpid, (k) -> new ArrayList<>());
|
||||||
for (Iterator<ReplicaInfo> it =
|
for (Iterator<ReplicaInfo> it =
|
||||||
volumeMap.replicas(bpid).iterator(); it.hasNext();) {
|
volumeMap.replicas(bpid).iterator(); it.hasNext();) {
|
||||||
ReplicaInfo block = it.next();
|
ReplicaInfo block = it.next();
|
||||||
|
@ -588,9 +589,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
it.remove();
|
it.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
blkToInvalidate.put(bpid, blocks);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
storageToRemove.add(sd.getStorageUuid());
|
storageToRemove.add(sd.getStorageUuid());
|
||||||
storageLocationsToRemove.remove(sdLocation);
|
storageLocationsToRemove.remove(sdLocation);
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
import java.io.FileInputStream;
|
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
|
@ -106,6 +105,8 @@ import static org.mockito.Mockito.doReturn;
|
||||||
import static org.mockito.Mockito.doThrow;
|
import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -268,16 +269,24 @@ public class TestFsDatasetImpl {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 30000)
|
@Test(timeout = 30000)
|
||||||
public void testRemoveVolumes() throws IOException {
|
public void testRemoveOneVolume() throws IOException {
|
||||||
// Feed FsDataset with block metadata.
|
// Feed FsDataset with block metadata.
|
||||||
final int NUM_BLOCKS = 100;
|
final int numBlocks = 100;
|
||||||
for (int i = 0; i < NUM_BLOCKS; i++) {
|
for (int i = 0; i < numBlocks; i++) {
|
||||||
String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length];
|
String bpid = BLOCK_POOL_IDS[numBlocks % BLOCK_POOL_IDS.length];
|
||||||
ExtendedBlock eb = new ExtendedBlock(bpid, i);
|
ExtendedBlock eb = new ExtendedBlock(bpid, i);
|
||||||
try (ReplicaHandler replica =
|
ReplicaHandler replica = null;
|
||||||
dataset.createRbw(StorageType.DEFAULT, null, eb, false)) {
|
try {
|
||||||
|
replica = dataset.createRbw(StorageType.DEFAULT, null, eb,
|
||||||
|
false);
|
||||||
|
} finally {
|
||||||
|
if (replica != null) {
|
||||||
|
replica.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove one volume
|
||||||
final String[] dataDirs =
|
final String[] dataDirs =
|
||||||
conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
|
conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
|
||||||
final String volumePathToRemove = dataDirs[0];
|
final String volumePathToRemove = dataDirs[0];
|
||||||
|
@ -300,6 +309,11 @@ public class TestFsDatasetImpl {
|
||||||
assertEquals("The volume has been removed from the storageMap.",
|
assertEquals("The volume has been removed from the storageMap.",
|
||||||
expectedNumVolumes, dataset.storageMap.size());
|
expectedNumVolumes, dataset.storageMap.size());
|
||||||
|
|
||||||
|
// DataNode.notifyNamenodeDeletedBlock() should be called 50 times
|
||||||
|
// as we deleted one volume that has 50 blocks
|
||||||
|
verify(datanode, times(50))
|
||||||
|
.notifyNamenodeDeletedBlock(any(), any());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
dataset.asyncDiskService.execute(volumeToRemove,
|
dataset.asyncDiskService.execute(volumeToRemove,
|
||||||
new Runnable() {
|
new Runnable() {
|
||||||
|
@ -317,10 +331,81 @@ public class TestFsDatasetImpl {
|
||||||
totalNumReplicas += dataset.volumeMap.size(bpid);
|
totalNumReplicas += dataset.volumeMap.size(bpid);
|
||||||
}
|
}
|
||||||
assertEquals("The replica infos on this volume has been removed from the "
|
assertEquals("The replica infos on this volume has been removed from the "
|
||||||
+ "volumeMap.", NUM_BLOCKS / NUM_INIT_VOLUMES,
|
+ "volumeMap.", numBlocks / NUM_INIT_VOLUMES,
|
||||||
totalNumReplicas);
|
totalNumReplicas);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testRemoveTwoVolumes() throws IOException {
|
||||||
|
// Feed FsDataset with block metadata.
|
||||||
|
final int numBlocks = 100;
|
||||||
|
for (int i = 0; i < numBlocks; i++) {
|
||||||
|
String bpid = BLOCK_POOL_IDS[numBlocks % BLOCK_POOL_IDS.length];
|
||||||
|
ExtendedBlock eb = new ExtendedBlock(bpid, i);
|
||||||
|
ReplicaHandler replica = null;
|
||||||
|
try {
|
||||||
|
replica = dataset.createRbw(StorageType.DEFAULT, null, eb,
|
||||||
|
false);
|
||||||
|
} finally {
|
||||||
|
if (replica != null) {
|
||||||
|
replica.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove two volumes
|
||||||
|
final String[] dataDirs =
|
||||||
|
conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(",");
|
||||||
|
Set<StorageLocation> volumesToRemove = new HashSet<>();
|
||||||
|
volumesToRemove.add(StorageLocation.parse(dataDirs[0]));
|
||||||
|
volumesToRemove.add(StorageLocation.parse(dataDirs[1]));
|
||||||
|
|
||||||
|
FsVolumeReferences volReferences = dataset.getFsVolumeReferences();
|
||||||
|
Set<FsVolumeImpl> volumes = new HashSet<>();
|
||||||
|
for (FsVolumeSpi vol: volReferences) {
|
||||||
|
for (StorageLocation volume : volumesToRemove) {
|
||||||
|
if (vol.getStorageLocation().equals(volume)) {
|
||||||
|
volumes.add((FsVolumeImpl) vol);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(2, volumes.size());
|
||||||
|
volReferences.close();
|
||||||
|
|
||||||
|
dataset.removeVolumes(volumesToRemove, true);
|
||||||
|
int expectedNumVolumes = dataDirs.length - 2;
|
||||||
|
assertEquals("The volume has been removed from the volumeList.",
|
||||||
|
expectedNumVolumes, getNumVolumes());
|
||||||
|
assertEquals("The volume has been removed from the storageMap.",
|
||||||
|
expectedNumVolumes, dataset.storageMap.size());
|
||||||
|
|
||||||
|
// DataNode.notifyNamenodeDeletedBlock() should be called 100 times
|
||||||
|
// as we deleted 2 volumes that have 100 blocks totally
|
||||||
|
verify(datanode, times(100))
|
||||||
|
.notifyNamenodeDeletedBlock(any(), any());
|
||||||
|
|
||||||
|
for (FsVolumeImpl volume : volumes) {
|
||||||
|
try {
|
||||||
|
dataset.asyncDiskService.execute(volume,
|
||||||
|
new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {}
|
||||||
|
});
|
||||||
|
fail("Expect RuntimeException: the volume has been removed from the "
|
||||||
|
+ "AsyncDiskService.");
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
GenericTestUtils.assertExceptionContains("Cannot find volume", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int totalNumReplicas = 0;
|
||||||
|
for (String bpid : dataset.volumeMap.getBlockPoolList()) {
|
||||||
|
totalNumReplicas += dataset.volumeMap.size(bpid);
|
||||||
|
}
|
||||||
|
assertEquals("The replica infos on this volume has been removed from the "
|
||||||
|
+ "volumeMap.", 0, totalNumReplicas);
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 5000)
|
@Test(timeout = 5000)
|
||||||
public void testRemoveNewlyAddedVolume() throws IOException {
|
public void testRemoveNewlyAddedVolume() throws IOException {
|
||||||
final int numExistingVolumes = getNumVolumes();
|
final int numExistingVolumes = getNumVolumes();
|
||||||
|
|
Loading…
Reference in New Issue