HDFS-16999. Fix wrong use of processFirstBlockReport(). (#5622). Contributed by Shuyan Zhang.
Signed-off-by: Ayush Saxena <ayushsaxena@apache.org> Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
This commit is contained in:
parent
bdeca45294
commit
03bf8f982a
|
@ -2913,7 +2913,7 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
return !node.hasStaleStorages();
|
return !node.hasStaleStorages();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (storageInfo.getBlockReportCount() == 0) {
|
if (!storageInfo.hasReceivedBlockReport()) {
|
||||||
// The first block report can be processed a lot more efficiently than
|
// The first block report can be processed a lot more efficiently than
|
||||||
// ordinary block reports. This shortens restart times.
|
// ordinary block reports. This shortens restart times.
|
||||||
blockLog.info("BLOCK* processReport 0x{} with lease ID 0x{}: Processing first "
|
blockLog.info("BLOCK* processReport 0x{} with lease ID 0x{}: Processing first "
|
||||||
|
|
|
@ -128,6 +128,10 @@ public class DatanodeStorageInfo {
|
||||||
/** The number of block reports received */
|
/** The number of block reports received */
|
||||||
private int blockReportCount = 0;
|
private int blockReportCount = 0;
|
||||||
|
|
||||||
|
/** Whether the NameNode has received block reports for this storage since it
|
||||||
|
* was started.*/
|
||||||
|
private boolean hasReceivedBlockReport = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set to false on any NN failover, and reset to true
|
* Set to false on any NN failover, and reset to true
|
||||||
* whenever a block report is received.
|
* whenever a block report is received.
|
||||||
|
@ -160,6 +164,10 @@ public class DatanodeStorageInfo {
|
||||||
return blockReportCount;
|
return blockReportCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean hasReceivedBlockReport() {
|
||||||
|
return hasReceivedBlockReport;
|
||||||
|
}
|
||||||
|
|
||||||
void setBlockReportCount(int blockReportCount) {
|
void setBlockReportCount(int blockReportCount) {
|
||||||
this.blockReportCount = blockReportCount;
|
this.blockReportCount = blockReportCount;
|
||||||
}
|
}
|
||||||
|
@ -188,6 +196,7 @@ public class DatanodeStorageInfo {
|
||||||
blockContentsStale = false;
|
blockContentsStale = false;
|
||||||
}
|
}
|
||||||
blockReportCount++;
|
blockReportCount++;
|
||||||
|
hasReceivedBlockReport = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
|
@ -350,7 +350,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
File getCurrentDir() {
|
public File getCurrentDir() {
|
||||||
return currentDir;
|
return currentDir;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
|
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
|
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.collect.LinkedListMultimap;
|
import org.apache.hadoop.thirdparty.com.google.common.collect.LinkedListMultimap;
|
||||||
|
@ -2066,4 +2069,56 @@ public class TestBlockManager {
|
||||||
// validateReconstructionWork return false, need to perform resetTargets().
|
// validateReconstructionWork return false, need to perform resetTargets().
|
||||||
assertNull(work.getTargets());
|
assertNull(work.getTargets());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test whether the first block report after DataNode restart is completely
|
||||||
|
* processed.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testBlockReportAfterDataNodeRestart() throws Exception {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(3).storagesPerDatanode(1).build()) {
|
||||||
|
cluster.waitActive();
|
||||||
|
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
final Path filePath = new Path("/tmp.txt");
|
||||||
|
final long fileLen = 1L;
|
||||||
|
DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, 1L);
|
||||||
|
DFSTestUtil.waitForReplication(fs, filePath, (short) 3, 60000);
|
||||||
|
ArrayList<DataNode> datanodes = cluster.getDataNodes();
|
||||||
|
assertEquals(datanodes.size(), 3);
|
||||||
|
|
||||||
|
// Stop RedundancyMonitor.
|
||||||
|
blockManager.setInitializedReplQueues(false);
|
||||||
|
|
||||||
|
// Delete the replica on the first datanode.
|
||||||
|
DataNode dn = datanodes.get(0);
|
||||||
|
int dnIpcPort = dn.getIpcPort();
|
||||||
|
File dnDir = dn.getFSDataset().getVolumeList().get(0).getCurrentDir();
|
||||||
|
String[] children = FileUtil.list(dnDir);
|
||||||
|
for (String s : children) {
|
||||||
|
if (!s.equals("VERSION")) {
|
||||||
|
FileUtil.fullyDeleteContents(new File(dnDir, s));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The number of replicas is still 3 because the datanode has not sent
|
||||||
|
// a new block report.
|
||||||
|
FileStatus stat = fs.getFileStatus(filePath);
|
||||||
|
BlockLocation[] locs = fs.getFileBlockLocations(stat, 0, stat.getLen());
|
||||||
|
assertEquals(3, locs[0].getHosts().length);
|
||||||
|
|
||||||
|
// Restart the first datanode.
|
||||||
|
cluster.restartDataNode(0, true);
|
||||||
|
|
||||||
|
// Wait for the block report to be processed.
|
||||||
|
cluster.waitDatanodeFullyStarted(cluster.getDataNode(dnIpcPort), 10000);
|
||||||
|
cluster.waitFirstBRCompleted(0, 10000);
|
||||||
|
|
||||||
|
// The replica num should be 2.
|
||||||
|
locs = fs.getFileBlockLocations(stat, 0, stat.getLen());
|
||||||
|
assertEquals(2, locs[0].getHosts().length);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue