HDFS-13677. Dynamic refresh Disk configuration results in overwriting VolumeMap. Contributed by xuzq.
This commit is contained in:
parent
95790bb7e5
commit
4b4200f1f8
|
@ -419,7 +419,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
LOG.error(errorMsg);
|
LOG.error(errorMsg);
|
||||||
throw new IOException(errorMsg);
|
throw new IOException(errorMsg);
|
||||||
}
|
}
|
||||||
volumeMap.addAll(replicaMap);
|
volumeMap.mergeAll(replicaMap);
|
||||||
storageMap.put(sd.getStorageUuid(),
|
storageMap.put(sd.getStorageUuid(),
|
||||||
new DatanodeStorage(sd.getStorageUuid(),
|
new DatanodeStorage(sd.getStorageUuid(),
|
||||||
DatanodeStorage.State.NORMAL,
|
DatanodeStorage.State.NORMAL,
|
||||||
|
|
|
@ -167,6 +167,20 @@ class ReplicaMap {
|
||||||
map.putAll(other.map);
|
map.putAll(other.map);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merge all entries from the given replica map into the local replica map.
|
||||||
|
*/
|
||||||
|
void mergeAll(ReplicaMap other) {
|
||||||
|
other.map.forEach(
|
||||||
|
(bp, replicaInfos) -> {
|
||||||
|
replicaInfos.forEach(
|
||||||
|
replicaInfo -> add(bp, replicaInfo)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove the replica's meta information from the map that matches
|
* Remove the replica's meta information from the map that matches
|
||||||
* the input block's id and generation stamp
|
* the input block's id and generation stamp
|
||||||
|
|
|
@ -420,6 +420,74 @@ public class TestDataNodeHotSwapVolumes {
|
||||||
verifyFileLength(cluster.getFileSystem(), testFile, numBlocks);
|
verifyFileLength(cluster.getFileSystem(), testFile, numBlocks);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test re-adding one volume with some blocks on a running MiniDFSCluster
|
||||||
|
* with only one NameNode to reproduce HDFS-13677.
|
||||||
|
*/
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testReAddVolumeWithBlocks()
|
||||||
|
throws IOException, ReconfigurationException,
|
||||||
|
InterruptedException, TimeoutException {
|
||||||
|
startDFSCluster(1, 1);
|
||||||
|
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
|
final int numBlocks = 10;
|
||||||
|
|
||||||
|
Path testFile = new Path("/test");
|
||||||
|
createFile(testFile, numBlocks);
|
||||||
|
|
||||||
|
List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
|
||||||
|
cluster.getAllBlockReports(bpid);
|
||||||
|
assertEquals(1, blockReports.size()); // 1 DataNode
|
||||||
|
assertEquals(2, blockReports.get(0).size()); // 2 volumes
|
||||||
|
|
||||||
|
// Now remove the second volume
|
||||||
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
|
Collection<String> oldDirs = getDataDirs(dn);
|
||||||
|
String newDirs = oldDirs.iterator().next(); // Keep the first volume.
|
||||||
|
assertThat(
|
||||||
|
"DN did not update its own config",
|
||||||
|
dn.reconfigurePropertyImpl(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs),
|
||||||
|
is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
|
||||||
|
assertFileLocksReleased(
|
||||||
|
new ArrayList<String>(oldDirs).subList(1, oldDirs.size()));
|
||||||
|
|
||||||
|
// Now create another file - the first volume should have 15 blocks
|
||||||
|
// and 5 blocks on the previously removed volume
|
||||||
|
createFile(new Path("/test2"), numBlocks);
|
||||||
|
dn.scheduleAllBlockReport(0);
|
||||||
|
blockReports = cluster.getAllBlockReports(bpid);
|
||||||
|
|
||||||
|
assertEquals(1, blockReports.size()); // 1 DataNode
|
||||||
|
assertEquals(1, blockReports.get(0).size()); // 1 volume
|
||||||
|
for (BlockListAsLongs blockList : blockReports.get(0).values()) {
|
||||||
|
assertEquals(15, blockList.getNumberOfBlocks());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now add the original volume back again and ensure 15 blocks are reported
|
||||||
|
assertThat(
|
||||||
|
"DN did not update its own config",
|
||||||
|
dn.reconfigurePropertyImpl(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, String.join(",", oldDirs)),
|
||||||
|
is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
|
||||||
|
dn.scheduleAllBlockReport(0);
|
||||||
|
blockReports = cluster.getAllBlockReports(bpid);
|
||||||
|
|
||||||
|
assertEquals(1, blockReports.size()); // 1 DataNode
|
||||||
|
assertEquals(2, blockReports.get(0).size()); // 2 volumes
|
||||||
|
|
||||||
|
// The order of the block reports is not guaranteed. As we expect 2, get the
|
||||||
|
// max block count and the min block count and then assert on that.
|
||||||
|
int minNumBlocks = Integer.MAX_VALUE;
|
||||||
|
int maxNumBlocks = Integer.MIN_VALUE;
|
||||||
|
for (BlockListAsLongs blockList : blockReports.get(0).values()) {
|
||||||
|
minNumBlocks = Math.min(minNumBlocks, blockList.getNumberOfBlocks());
|
||||||
|
maxNumBlocks = Math.max(maxNumBlocks, blockList.getNumberOfBlocks());
|
||||||
|
}
|
||||||
|
assertEquals(5, minNumBlocks);
|
||||||
|
assertEquals(15, maxNumBlocks);
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
public void testAddVolumesDuringWrite()
|
public void testAddVolumesDuringWrite()
|
||||||
throws IOException, InterruptedException, TimeoutException,
|
throws IOException, InterruptedException, TimeoutException,
|
||||||
|
|
|
@ -108,4 +108,26 @@ public class TestReplicaMap {
|
||||||
map.add(bpid, new FinalizedReplica(block, null, null));
|
map.add(bpid, new FinalizedReplica(block, null, null));
|
||||||
assertNotNull(map.remove(bpid, block.getBlockId()));
|
assertNotNull(map.remove(bpid, block.getBlockId()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMergeAll() {
|
||||||
|
ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock());
|
||||||
|
Block tmpBlock = new Block(5678, 5678, 5678);
|
||||||
|
temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
|
||||||
|
|
||||||
|
map.mergeAll(temReplicaMap);
|
||||||
|
assertNotNull(map.get(bpid, 1234));
|
||||||
|
assertNotNull(map.get(bpid, 5678));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddAll() {
|
||||||
|
ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock());
|
||||||
|
Block tmpBlock = new Block(5678, 5678, 5678);
|
||||||
|
temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
|
||||||
|
|
||||||
|
map.addAll(temReplicaMap);
|
||||||
|
assertNull(map.get(bpid, 1234));
|
||||||
|
assertNotNull(map.get(bpid, 5678));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue