HDFS-13677. Dynamic refresh Disk configuration results in overwriting VolumeMap. Contributed by xuzq and Stephen O'Donnell.
(cherry picked from commit 102c8fca10f3c626ab8bc47f818c8391a5c35289)
(cherry picked from commit 4a1d51dea2
)
This commit is contained in:
parent
cdd3982db4
commit
9f713bb825
|
@ -453,7 +453,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
LOG.error(errorMsg);
|
||||
throw new IOException(errorMsg);
|
||||
}
|
||||
volumeMap.addAll(replicaMap);
|
||||
volumeMap.mergeAll(replicaMap);
|
||||
storageMap.put(sd.getStorageUuid(),
|
||||
new DatanodeStorage(sd.getStorageUuid(),
|
||||
DatanodeStorage.State.NORMAL,
|
||||
|
|
|
@ -150,7 +150,19 @@ class ReplicaMap {
|
|||
void addAll(ReplicaMap other) {
|
||||
map.putAll(other.map);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Merge all entries from the given replica map into the local replica map.
|
||||
*/
|
||||
void mergeAll(ReplicaMap other) {
|
||||
for(String bp : other.getBlockPoolList()) {
|
||||
for(ReplicaInfo r : other.map.get(bp)) {
|
||||
add(bp, r);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the replica's meta information from the map that matches
|
||||
* the input block's id and generation stamp
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -421,6 +422,75 @@ public class TestDataNodeHotSwapVolumes {
|
|||
verifyFileLength(cluster.getFileSystem(), testFile, numBlocks);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test re-adding one volume with some blocks on a running MiniDFSCluster
|
||||
* with only one NameNode to reproduce HDFS-13677.
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
public void testReAddVolumeWithBlocks()
|
||||
throws IOException, ReconfigurationException,
|
||||
InterruptedException, TimeoutException {
|
||||
startDFSCluster(1, 1);
|
||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
final int numBlocks = 10;
|
||||
|
||||
Path testFile = new Path("/test");
|
||||
createFile(testFile, numBlocks);
|
||||
|
||||
List<Map<DatanodeStorage, BlockListAsLongs>> blockReports =
|
||||
cluster.getAllBlockReports(bpid);
|
||||
assertEquals(1, blockReports.size()); // 1 DataNode
|
||||
assertEquals(2, blockReports.get(0).size()); // 2 volumes
|
||||
|
||||
// Now remove the second volume
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
Collection<String> oldDirs = getDataDirs(dn);
|
||||
String newDirs = oldDirs.iterator().next(); // Keep the first volume.
|
||||
assertThat(
|
||||
"DN did not update its own config",
|
||||
dn.reconfigurePropertyImpl(
|
||||
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs),
|
||||
is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
|
||||
assertFileLocksReleased(
|
||||
new ArrayList<String>(oldDirs).subList(1, oldDirs.size()));
|
||||
|
||||
// Now create another file - the first volume should have 15 blocks
|
||||
// and 5 blocks on the previously removed volume
|
||||
createFile(new Path("/test2"), numBlocks);
|
||||
dn.scheduleAllBlockReport(0);
|
||||
blockReports = cluster.getAllBlockReports(bpid);
|
||||
|
||||
assertEquals(1, blockReports.size()); // 1 DataNode
|
||||
assertEquals(1, blockReports.get(0).size()); // 1 volume
|
||||
for (BlockListAsLongs blockList : blockReports.get(0).values()) {
|
||||
assertEquals(15, blockList.getNumberOfBlocks());
|
||||
}
|
||||
|
||||
// Now add the original volume back again and ensure 15 blocks are reported
|
||||
assertThat(
|
||||
"DN did not update its own config",
|
||||
dn.reconfigurePropertyImpl(
|
||||
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
||||
StringUtils.join(",", oldDirs)),
|
||||
is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY)));
|
||||
dn.scheduleAllBlockReport(0);
|
||||
blockReports = cluster.getAllBlockReports(bpid);
|
||||
|
||||
assertEquals(1, blockReports.size()); // 1 DataNode
|
||||
assertEquals(2, blockReports.get(0).size()); // 2 volumes
|
||||
|
||||
// The order of the block reports is not guaranteed. As we expect 2, get the
|
||||
// max block count and the min block count and then assert on that.
|
||||
int minNumBlocks = Integer.MAX_VALUE;
|
||||
int maxNumBlocks = Integer.MIN_VALUE;
|
||||
for (BlockListAsLongs blockList : blockReports.get(0).values()) {
|
||||
minNumBlocks = Math.min(minNumBlocks, blockList.getNumberOfBlocks());
|
||||
maxNumBlocks = Math.max(maxNumBlocks, blockList.getNumberOfBlocks());
|
||||
}
|
||||
assertEquals(5, minNumBlocks);
|
||||
assertEquals(15, maxNumBlocks);
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testAddVolumesDuringWrite()
|
||||
throws IOException, InterruptedException, TimeoutException,
|
||||
|
|
|
@ -108,4 +108,26 @@ public class TestReplicaMap {
|
|||
map.add(bpid, new FinalizedReplica(block, null, null));
|
||||
assertNotNull(map.remove(bpid, block.getBlockId()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeAll() {
|
||||
ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock());
|
||||
Block tmpBlock = new Block(5678, 5678, 5678);
|
||||
temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
|
||||
|
||||
map.mergeAll(temReplicaMap);
|
||||
assertNotNull(map.get(bpid, 1234));
|
||||
assertNotNull(map.get(bpid, 5678));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddAll() {
|
||||
ReplicaMap temReplicaMap = new ReplicaMap(new AutoCloseableLock());
|
||||
Block tmpBlock = new Block(5678, 5678, 5678);
|
||||
temReplicaMap.add(bpid, new FinalizedReplica(tmpBlock, null, null));
|
||||
|
||||
map.addAll(temReplicaMap);
|
||||
assertNull(map.get(bpid, 1234));
|
||||
assertNotNull(map.get(bpid, 5678));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue