HDFS-5153. Datanode should send block reports for each storage in a separate message. (Arpit Agarwal)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1563254 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5a2428d39f
commit
5beeb30169
|
@ -297,6 +297,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
HDFS-5781. Use an array to record the mapping between FSEditLogOpCode and
|
HDFS-5781. Use an array to record the mapping between FSEditLogOpCode and
|
||||||
the corresponding byte value. (jing9)
|
the corresponding byte value. (jing9)
|
||||||
|
|
||||||
|
HDFS-5153. Datanode should send block reports for each storage in a
|
||||||
|
separate message. (Arpit Agarwal)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
|
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
|
||||||
|
|
|
@ -399,6 +399,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final long DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 60 * 60 * 1000;
|
public static final long DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 60 * 60 * 1000;
|
||||||
public static final String DFS_BLOCKREPORT_INITIAL_DELAY_KEY = "dfs.blockreport.initialDelay";
|
public static final String DFS_BLOCKREPORT_INITIAL_DELAY_KEY = "dfs.blockreport.initialDelay";
|
||||||
public static final int DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0;
|
public static final int DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0;
|
||||||
|
public static final String DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY = "dfs.blockreport.split.threshold";
|
||||||
|
public static final long DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT = 1000 * 1000;
|
||||||
public static final String DFS_CACHEREPORT_INTERVAL_MSEC_KEY = "dfs.cachereport.intervalMsec";
|
public static final String DFS_CACHEREPORT_INTERVAL_MSEC_KEY = "dfs.cachereport.intervalMsec";
|
||||||
public static final long DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT = 10 * 1000;
|
public static final long DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT = 10 * 1000;
|
||||||
public static final String DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit";
|
public static final String DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit";
|
||||||
|
|
|
@ -1616,15 +1616,19 @@ public class BlockManager {
|
||||||
/**
|
/**
|
||||||
* The given storage is reporting all its blocks.
|
* The given storage is reporting all its blocks.
|
||||||
* Update the (storage-->block list) and (block-->storage list) maps.
|
* Update the (storage-->block list) and (block-->storage list) maps.
|
||||||
|
*
|
||||||
|
* @return true if all known storages of the given DN have finished reporting.
|
||||||
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void processReport(final DatanodeID nodeID,
|
public boolean processReport(final DatanodeID nodeID,
|
||||||
final DatanodeStorage storage, final String poolId,
|
final DatanodeStorage storage, final String poolId,
|
||||||
final BlockListAsLongs newReport) throws IOException {
|
final BlockListAsLongs newReport) throws IOException {
|
||||||
namesystem.writeLock();
|
namesystem.writeLock();
|
||||||
final long startTime = Time.now(); //after acquiring write lock
|
final long startTime = Time.now(); //after acquiring write lock
|
||||||
final long endTime;
|
final long endTime;
|
||||||
|
DatanodeDescriptor node;
|
||||||
try {
|
try {
|
||||||
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
|
node = datanodeManager.getDatanode(nodeID);
|
||||||
if (node == null || !node.isAlive) {
|
if (node == null || !node.isAlive) {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
"ProcessReport from dead or unregistered node: " + nodeID);
|
"ProcessReport from dead or unregistered node: " + nodeID);
|
||||||
|
@ -1632,13 +1636,21 @@ public class BlockManager {
|
||||||
|
|
||||||
// To minimize startup time, we discard any second (or later) block reports
|
// To minimize startup time, we discard any second (or later) block reports
|
||||||
// that we receive while still in startup phase.
|
// that we receive while still in startup phase.
|
||||||
final DatanodeStorageInfo storageInfo = node.updateStorage(storage);
|
DatanodeStorageInfo storageInfo = node.getStorageInfo(storage.getStorageID());
|
||||||
|
|
||||||
|
if (storageInfo == null) {
|
||||||
|
// We handle this for backwards compatibility.
|
||||||
|
storageInfo = node.updateStorage(storage);
|
||||||
|
LOG.warn("Unknown storageId " + storage.getStorageID() +
|
||||||
|
", updating storageMap. This indicates a buggy " +
|
||||||
|
"DataNode that isn't heartbeating correctly.");
|
||||||
|
}
|
||||||
if (namesystem.isInStartupSafeMode()
|
if (namesystem.isInStartupSafeMode()
|
||||||
&& storageInfo.getBlockReportCount() > 0) {
|
&& storageInfo.getBlockReportCount() > 0) {
|
||||||
blockLog.info("BLOCK* processReport: "
|
blockLog.info("BLOCK* processReport: "
|
||||||
+ "discarded non-initial block report from " + nodeID
|
+ "discarded non-initial block report from " + nodeID
|
||||||
+ " because namenode still in startup phase");
|
+ " because namenode still in startup phase");
|
||||||
return;
|
return !node.hasStaleStorages();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (storageInfo.numBlocks() == 0) {
|
if (storageInfo.numBlocks() == 0) {
|
||||||
|
@ -1655,7 +1667,7 @@ public class BlockManager {
|
||||||
storageInfo.receivedBlockReport();
|
storageInfo.receivedBlockReport();
|
||||||
if (staleBefore && !storageInfo.areBlockContentsStale()) {
|
if (staleBefore && !storageInfo.areBlockContentsStale()) {
|
||||||
LOG.info("BLOCK* processReport: Received first block report from "
|
LOG.info("BLOCK* processReport: Received first block report from "
|
||||||
+ node + " after starting up or becoming active. Its block "
|
+ storage + " after starting up or becoming active. Its block "
|
||||||
+ "contents are no longer considered stale");
|
+ "contents are no longer considered stale");
|
||||||
rescanPostponedMisreplicatedBlocks();
|
rescanPostponedMisreplicatedBlocks();
|
||||||
}
|
}
|
||||||
|
@ -1670,9 +1682,10 @@ public class BlockManager {
|
||||||
if (metrics != null) {
|
if (metrics != null) {
|
||||||
metrics.addBlockReport((int) (endTime - startTime));
|
metrics.addBlockReport((int) (endTime - startTime));
|
||||||
}
|
}
|
||||||
blockLog.info("BLOCK* processReport: from "
|
blockLog.info("BLOCK* processReport: from storage " + storage.getStorageID()
|
||||||
+ nodeID + ", blocks: " + newReport.getNumberOfBlocks()
|
+ " node " + nodeID + ", blocks: " + newReport.getNumberOfBlocks()
|
||||||
+ ", processing time: " + (endTime - startTime) + " msecs");
|
+ ", processing time: " + (endTime - startTime) + " msecs");
|
||||||
|
return !node.hasStaleStorages();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1827,7 +1840,7 @@ public class BlockManager {
|
||||||
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
|
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
|
||||||
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
|
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
|
||||||
|
|
||||||
final DatanodeStorageInfo storageInfo = dn.updateStorage(storage);
|
final DatanodeStorageInfo storageInfo = dn.getStorageInfo(storage.getStorageID());
|
||||||
|
|
||||||
// place a delimiter in the list which separates blocks
|
// place a delimiter in the list which separates blocks
|
||||||
// that have been reported from those that have not
|
// that have been reported from those that have not
|
||||||
|
|
|
@ -257,6 +257,17 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean hasStaleStorages() {
|
||||||
|
synchronized (storageMap) {
|
||||||
|
for (DatanodeStorageInfo storage : storageMap.values()) {
|
||||||
|
if (storage.areBlockContentsStale()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove block from the list of blocks belonging to the data-node. Remove
|
* Remove block from the list of blocks belonging to the data-node. Remove
|
||||||
* data-node from the block.
|
* data-node from the block.
|
||||||
|
|
|
@ -22,11 +22,9 @@ import static org.apache.hadoop.util.Time.now;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
|
@ -437,59 +435,87 @@ class BPServiceActor implements Runnable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Report the list blocks to the Namenode
|
* Report the list blocks to the Namenode
|
||||||
|
* @return DatanodeCommands returned by the NN. May be null.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
DatanodeCommand blockReport() throws IOException {
|
List<DatanodeCommand> blockReport() throws IOException {
|
||||||
// send block report if timer has expired.
|
// send block report if timer has expired.
|
||||||
DatanodeCommand cmd = null;
|
final long startTime = now();
|
||||||
long startTime = now();
|
if (startTime - lastBlockReport <= dnConf.blockReportInterval) {
|
||||||
if (startTime - lastBlockReport > dnConf.blockReportInterval) {
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
|
||||||
|
|
||||||
// Flush any block information that precedes the block report. Otherwise
|
// Flush any block information that precedes the block report. Otherwise
|
||||||
// we have a chance that we will miss the delHint information
|
// we have a chance that we will miss the delHint information
|
||||||
// or we will report an RBW replica after the BlockReport already reports
|
// or we will report an RBW replica after the BlockReport already reports
|
||||||
// a FINALIZED one.
|
// a FINALIZED one.
|
||||||
reportReceivedDeletedBlocks();
|
reportReceivedDeletedBlocks();
|
||||||
|
lastDeletedReport = startTime;
|
||||||
|
|
||||||
// Send one block report per known storage.
|
|
||||||
|
|
||||||
// Create block report
|
|
||||||
long brCreateStartTime = now();
|
long brCreateStartTime = now();
|
||||||
long totalBlockCount = 0;
|
|
||||||
|
|
||||||
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
|
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
|
||||||
dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());
|
dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());
|
||||||
|
|
||||||
// Send block report
|
// Convert the reports to the format expected by the NN.
|
||||||
long brSendStartTime = now();
|
int i = 0;
|
||||||
StorageBlockReport[] reports =
|
int totalBlockCount = 0;
|
||||||
|
StorageBlockReport reports[] =
|
||||||
new StorageBlockReport[perVolumeBlockLists.size()];
|
new StorageBlockReport[perVolumeBlockLists.size()];
|
||||||
|
|
||||||
int i = 0;
|
|
||||||
for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
|
for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
|
||||||
DatanodeStorage dnStorage = kvPair.getKey();
|
|
||||||
BlockListAsLongs blockList = kvPair.getValue();
|
BlockListAsLongs blockList = kvPair.getValue();
|
||||||
|
reports[i++] = new StorageBlockReport(
|
||||||
|
kvPair.getKey(), blockList.getBlockListAsLongs());
|
||||||
totalBlockCount += blockList.getNumberOfBlocks();
|
totalBlockCount += blockList.getNumberOfBlocks();
|
||||||
|
|
||||||
reports[i++] =
|
|
||||||
new StorageBlockReport(
|
|
||||||
dnStorage, blockList.getBlockListAsLongs());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports);
|
// Send the reports to the NN.
|
||||||
|
int numReportsSent;
|
||||||
|
long brSendStartTime = now();
|
||||||
|
if (totalBlockCount < dnConf.blockReportSplitThreshold) {
|
||||||
|
// Below split threshold, send all reports in a single message.
|
||||||
|
numReportsSent = 1;
|
||||||
|
DatanodeCommand cmd =
|
||||||
|
bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports);
|
||||||
|
if (cmd != null) {
|
||||||
|
cmds.add(cmd);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Send one block report per message.
|
||||||
|
numReportsSent = i;
|
||||||
|
for (StorageBlockReport report : reports) {
|
||||||
|
StorageBlockReport singleReport[] = { report };
|
||||||
|
DatanodeCommand cmd = bpNamenode.blockReport(
|
||||||
|
bpRegistration, bpos.getBlockPoolId(), singleReport);
|
||||||
|
if (cmd != null) {
|
||||||
|
cmds.add(cmd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Log the block report processing stats from Datanode perspective
|
// Log the block report processing stats from Datanode perspective
|
||||||
long brSendCost = now() - brSendStartTime;
|
long brSendCost = now() - brSendStartTime;
|
||||||
long brCreateCost = brSendStartTime - brCreateStartTime;
|
long brCreateCost = brSendStartTime - brCreateStartTime;
|
||||||
dn.getMetrics().addBlockReport(brSendCost);
|
dn.getMetrics().addBlockReport(brSendCost);
|
||||||
LOG.info("BlockReport of " + totalBlockCount
|
LOG.info("Sent " + numReportsSent + " blockreports " + totalBlockCount +
|
||||||
+ " blocks took " + brCreateCost + " msec to generate and "
|
" blocks total. Took " + brCreateCost +
|
||||||
+ brSendCost + " msecs for RPC and NN processing");
|
" msec to generate and " + brSendCost +
|
||||||
|
" msecs for RPC and NN processing. " +
|
||||||
|
" Got back commands " +
|
||||||
|
(cmds.size() == 0 ? "none" : Joiner.on("; ").join(cmds)));
|
||||||
|
|
||||||
// If we have sent the first block report, then wait a random
|
scheduleNextBlockReport(startTime);
|
||||||
|
return cmds.size() == 0 ? null : cmds;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void scheduleNextBlockReport(long previousReportStartTime) {
|
||||||
|
// If we have sent the first set of block reports, then wait a random
|
||||||
// time before we start the periodic block reports.
|
// time before we start the periodic block reports.
|
||||||
if (resetBlockReportTime) {
|
if (resetBlockReportTime) {
|
||||||
lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
|
lastBlockReport = previousReportStartTime -
|
||||||
|
DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
|
||||||
resetBlockReportTime = false;
|
resetBlockReportTime = false;
|
||||||
} else {
|
} else {
|
||||||
/* say the last block report was at 8:20:14. The current report
|
/* say the last block report was at 8:20:14. The current report
|
||||||
|
@ -501,9 +527,6 @@ class BPServiceActor implements Runnable {
|
||||||
lastBlockReport += (now() - lastBlockReport) /
|
lastBlockReport += (now() - lastBlockReport) /
|
||||||
dnConf.blockReportInterval * dnConf.blockReportInterval;
|
dnConf.blockReportInterval * dnConf.blockReportInterval;
|
||||||
}
|
}
|
||||||
LOG.info("sent block report, processed command:" + cmd);
|
|
||||||
}
|
|
||||||
return cmd;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DatanodeCommand cacheReport() throws IOException {
|
DatanodeCommand cacheReport() throws IOException {
|
||||||
|
@ -513,7 +536,7 @@ class BPServiceActor implements Runnable {
|
||||||
}
|
}
|
||||||
// send cache report if timer has expired.
|
// send cache report if timer has expired.
|
||||||
DatanodeCommand cmd = null;
|
DatanodeCommand cmd = null;
|
||||||
long startTime = Time.monotonicNow();
|
final long startTime = Time.monotonicNow();
|
||||||
if (startTime - lastCacheReport > dnConf.cacheReportInterval) {
|
if (startTime - lastCacheReport > dnConf.cacheReportInterval) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Sending cacheReport from service actor: " + this);
|
LOG.debug("Sending cacheReport from service actor: " + this);
|
||||||
|
@ -613,7 +636,7 @@ class BPServiceActor implements Runnable {
|
||||||
//
|
//
|
||||||
while (shouldRun()) {
|
while (shouldRun()) {
|
||||||
try {
|
try {
|
||||||
long startTime = now();
|
final long startTime = now();
|
||||||
|
|
||||||
//
|
//
|
||||||
// Every so often, send heartbeat or block-report
|
// Every so often, send heartbeat or block-report
|
||||||
|
@ -659,10 +682,10 @@ class BPServiceActor implements Runnable {
|
||||||
lastDeletedReport = startTime;
|
lastDeletedReport = startTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
DatanodeCommand cmd = blockReport();
|
List<DatanodeCommand> cmds = blockReport();
|
||||||
processCommand(new DatanodeCommand[]{ cmd });
|
processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
|
||||||
|
|
||||||
cmd = cacheReport();
|
DatanodeCommand cmd = cacheReport();
|
||||||
processCommand(new DatanodeCommand[]{ cmd });
|
processCommand(new DatanodeCommand[]{ cmd });
|
||||||
|
|
||||||
// Now safe to start scanning the block pool.
|
// Now safe to start scanning the block pool.
|
||||||
|
|
|
@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
|
||||||
|
@ -70,6 +72,7 @@ public class DNConf {
|
||||||
final long readaheadLength;
|
final long readaheadLength;
|
||||||
final long heartBeatInterval;
|
final long heartBeatInterval;
|
||||||
final long blockReportInterval;
|
final long blockReportInterval;
|
||||||
|
final long blockReportSplitThreshold;
|
||||||
final long deleteReportInterval;
|
final long deleteReportInterval;
|
||||||
final long initialBlockReportDelay;
|
final long initialBlockReportDelay;
|
||||||
final long cacheReportInterval;
|
final long cacheReportInterval;
|
||||||
|
@ -117,6 +120,8 @@ public class DNConf {
|
||||||
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
|
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
|
||||||
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
|
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
|
||||||
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
|
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
|
||||||
|
this.blockReportSplitThreshold = conf.getLong(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
|
||||||
|
DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT);
|
||||||
this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
|
this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
|
||||||
DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT);
|
DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT);
|
||||||
|
|
||||||
|
|
|
@ -982,13 +982,18 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
+ "from " + nodeReg + ", reports.length=" + reports.length);
|
+ "from " + nodeReg + ", reports.length=" + reports.length);
|
||||||
}
|
}
|
||||||
final BlockManager bm = namesystem.getBlockManager();
|
final BlockManager bm = namesystem.getBlockManager();
|
||||||
|
boolean hasStaleStorages = true;
|
||||||
for(StorageBlockReport r : reports) {
|
for(StorageBlockReport r : reports) {
|
||||||
final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks());
|
final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks());
|
||||||
bm.processReport(nodeReg, r.getStorage(), poolId, blocks);
|
hasStaleStorages = bm.processReport(nodeReg, r.getStorage(), poolId, blocks);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nn.getFSImage().isUpgradeFinalized() && !nn.isStandbyState())
|
if (nn.getFSImage().isUpgradeFinalized() &&
|
||||||
|
!nn.isStandbyState() &&
|
||||||
|
!hasStaleStorages) {
|
||||||
return new FinalizeCommand(poolId);
|
return new FinalizeCommand(poolId);
|
||||||
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -482,6 +482,20 @@
|
||||||
<description>Delay for first block report in seconds.</description>
|
<description>Delay for first block report in seconds.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.blockreport.split.threshold</name>
|
||||||
|
<value>1000000</value>
|
||||||
|
<description>If the number of blocks on the DataNode is below this
|
||||||
|
threshold then it will send block reports for all Storage Directories
|
||||||
|
in a single message.
|
||||||
|
|
||||||
|
If the number of blocks exceeds this threshold then the DataNode will
|
||||||
|
send block reports for each Storage Directory in separate messages.
|
||||||
|
|
||||||
|
Set to zero to always split.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.datanode.directoryscan.interval</name>
|
<name>dfs.datanode.directoryscan.interval</name>
|
||||||
<value>21600</value>
|
<value>21600</value>
|
||||||
|
|
|
@ -52,7 +52,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
|
@ -69,20 +68,16 @@ import org.mockito.Mockito;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This test simulates a variety of situations when blocks are being
|
* This is the base class for simulating a variety of situations
|
||||||
* intentionally corrupted, unexpectedly modified, and so on before a block
|
* when blocks are being intentionally corrupted, unexpectedly modified,
|
||||||
* report is happening.
|
* and so on before a block report is happening.
|
||||||
*
|
*
|
||||||
* For each test case it runs two variations:
|
* By overriding {@link #sendBlockReports}, derived classes can test
|
||||||
* #1 - For a given DN, the first variation sends block reports for all
|
* different variations of how block reports are split across storages
|
||||||
* storages in a single call to the NN.
|
* and messages.
|
||||||
* #2 - For a given DN, the second variation sends block reports for each
|
|
||||||
* storage in a separate call.
|
|
||||||
*
|
|
||||||
* The behavior should be the same in either variation.
|
|
||||||
*/
|
*/
|
||||||
public class TestBlockReport {
|
public abstract class BlockReportTestBase {
|
||||||
public static final Log LOG = LogFactory.getLog(TestBlockReport.class);
|
public static final Log LOG = LogFactory.getLog(BlockReportTestBase.class);
|
||||||
|
|
||||||
private static short REPL_FACTOR = 1;
|
private static short REPL_FACTOR = 1;
|
||||||
private static final int RAND_LIMIT = 2000;
|
private static final int RAND_LIMIT = 2000;
|
||||||
|
@ -91,12 +86,11 @@ public class TestBlockReport {
|
||||||
private static final int DN_N0 = 0;
|
private static final int DN_N0 = 0;
|
||||||
private static final int FILE_START = 0;
|
private static final int FILE_START = 0;
|
||||||
|
|
||||||
static final int BLOCK_SIZE = 1024;
|
private static final int BLOCK_SIZE = 1024;
|
||||||
static final int NUM_BLOCKS = 10;
|
private static final int NUM_BLOCKS = 10;
|
||||||
static final int FILE_SIZE = NUM_BLOCKS * BLOCK_SIZE + 1;
|
private static final int FILE_SIZE = NUM_BLOCKS * BLOCK_SIZE + 1;
|
||||||
static String bpid;
|
|
||||||
|
|
||||||
private MiniDFSCluster cluster;
|
protected MiniDFSCluster cluster;
|
||||||
private DistributedFileSystem fs;
|
private DistributedFileSystem fs;
|
||||||
|
|
||||||
private static Random rand = new Random(RAND_LIMIT);
|
private static Random rand = new Random(RAND_LIMIT);
|
||||||
|
@ -112,8 +106,7 @@ public class TestBlockReport {
|
||||||
public void startUpCluster() throws IOException {
|
public void startUpCluster() throws IOException {
|
||||||
REPL_FACTOR = 1; //Reset if case a test has modified the value
|
REPL_FACTOR = 1; //Reset if case a test has modified the value
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL_FACTOR).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL_FACTOR).build();
|
||||||
fs = (DistributedFileSystem) cluster.getFileSystem();
|
fs = cluster.getFileSystem();
|
||||||
bpid = cluster.getNamesystem().getBlockPoolId();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -123,6 +116,15 @@ public class TestBlockReport {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static void resetConfiguration() {
|
||||||
|
conf = new Configuration();
|
||||||
|
int customPerChecksumSize = 512;
|
||||||
|
int customBlockSize = customPerChecksumSize * 3;
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, DN_RESCAN_INTERVAL);
|
||||||
|
}
|
||||||
|
|
||||||
// Generate a block report, optionally corrupting the generation
|
// Generate a block report, optionally corrupting the generation
|
||||||
// stamp and/or length of one block.
|
// stamp and/or length of one block.
|
||||||
private static StorageBlockReport[] getBlockReports(
|
private static StorageBlockReport[] getBlockReports(
|
||||||
|
@ -172,106 +174,11 @@ public class TestBlockReport {
|
||||||
* @param dnR
|
* @param dnR
|
||||||
* @param poolId
|
* @param poolId
|
||||||
* @param reports
|
* @param reports
|
||||||
* @param needtoSplit
|
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private void sendBlockReports(DatanodeRegistration dnR, String poolId,
|
protected abstract void sendBlockReports(DatanodeRegistration dnR, String poolId,
|
||||||
StorageBlockReport[] reports, boolean needtoSplit) throws IOException {
|
StorageBlockReport[] reports) throws IOException;
|
||||||
if (!needtoSplit) {
|
|
||||||
LOG.info("Sending combined block reports for " + dnR);
|
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
|
|
||||||
} else {
|
|
||||||
for (StorageBlockReport report : reports) {
|
|
||||||
LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
|
|
||||||
StorageBlockReport[] singletonReport = { report };
|
|
||||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test variations blockReport_01 through blockReport_09 with combined
|
|
||||||
* and split block reports.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void blockReportCombined_01() throws IOException {
|
|
||||||
blockReport_01(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void blockReportSplit_01() throws IOException {
|
|
||||||
blockReport_01(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void blockReportCombined_02() throws IOException {
|
|
||||||
blockReport_02(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void blockReportSplit_02() throws IOException {
|
|
||||||
blockReport_02(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void blockReportCombined_03() throws IOException {
|
|
||||||
blockReport_03(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void blockReportSplit_03() throws IOException {
|
|
||||||
blockReport_03(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void blockReportCombined_04() throws IOException {
|
|
||||||
blockReport_04(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void blockReportSplit_04() throws IOException {
|
|
||||||
blockReport_04(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void blockReportCombined_06() throws Exception {
|
|
||||||
blockReport_06(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void blockReportSplit_06() throws Exception {
|
|
||||||
blockReport_06(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void blockReportCombined_07() throws Exception {
|
|
||||||
blockReport_07(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void blockReportSplit_07() throws Exception {
|
|
||||||
blockReport_07(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void blockReportCombined_08() throws Exception {
|
|
||||||
blockReport_08(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void blockReportSplit_08() throws Exception {
|
|
||||||
blockReport_08(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void blockReportCombined_09() throws Exception {
|
|
||||||
blockReport_09(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void blockReportSplit_09() throws Exception {
|
|
||||||
blockReport_09(true);
|
|
||||||
}
|
|
||||||
/**
|
/**
|
||||||
* Test write a file, verifies and closes it. Then the length of the blocks
|
* Test write a file, verifies and closes it. Then the length of the blocks
|
||||||
* are messed up and BlockReport is forced.
|
* are messed up and BlockReport is forced.
|
||||||
|
@ -279,7 +186,8 @@ public class TestBlockReport {
|
||||||
*
|
*
|
||||||
* @throws java.io.IOException on an error
|
* @throws java.io.IOException on an error
|
||||||
*/
|
*/
|
||||||
private void blockReport_01(boolean splitBlockReports) throws IOException {
|
@Test(timeout=300000)
|
||||||
|
public void blockReport_01() throws IOException {
|
||||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
||||||
|
|
||||||
|
@ -312,7 +220,7 @@ public class TestBlockReport {
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
|
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
|
||||||
sendBlockReports(dnR, poolId, reports, splitBlockReports);
|
sendBlockReports(dnR, poolId, reports);
|
||||||
|
|
||||||
List<LocatedBlock> blocksAfterReport =
|
List<LocatedBlock> blocksAfterReport =
|
||||||
DFSTestUtil.getAllBlocks(fs.open(filePath));
|
DFSTestUtil.getAllBlocks(fs.open(filePath));
|
||||||
|
@ -338,7 +246,8 @@ public class TestBlockReport {
|
||||||
*
|
*
|
||||||
* @throws IOException in case of errors
|
* @throws IOException in case of errors
|
||||||
*/
|
*/
|
||||||
private void blockReport_02(boolean splitBlockReports) throws IOException {
|
@Test(timeout=300000)
|
||||||
|
public void blockReport_02() throws IOException {
|
||||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
LOG.info("Running test " + METHOD_NAME);
|
LOG.info("Running test " + METHOD_NAME);
|
||||||
|
|
||||||
|
@ -393,7 +302,7 @@ public class TestBlockReport {
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] reports = getBlockReports(dn0, poolId, false, false);
|
StorageBlockReport[] reports = getBlockReports(dn0, poolId, false, false);
|
||||||
sendBlockReports(dnR, poolId, reports, splitBlockReports);
|
sendBlockReports(dnR, poolId, reports);
|
||||||
|
|
||||||
BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem()
|
BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem()
|
||||||
.getBlockManager());
|
.getBlockManager());
|
||||||
|
@ -414,7 +323,8 @@ public class TestBlockReport {
|
||||||
*
|
*
|
||||||
* @throws IOException in case of an error
|
* @throws IOException in case of an error
|
||||||
*/
|
*/
|
||||||
private void blockReport_03(boolean splitBlockReports) throws IOException {
|
@Test(timeout=300000)
|
||||||
|
public void blockReport_03() throws IOException {
|
||||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
||||||
ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
|
ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
|
||||||
|
@ -424,7 +334,7 @@ public class TestBlockReport {
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
|
StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
|
||||||
sendBlockReports(dnR, poolId, reports, splitBlockReports);
|
sendBlockReports(dnR, poolId, reports);
|
||||||
printStats();
|
printStats();
|
||||||
|
|
||||||
assertThat("Wrong number of corrupt blocks",
|
assertThat("Wrong number of corrupt blocks",
|
||||||
|
@ -441,7 +351,8 @@ public class TestBlockReport {
|
||||||
*
|
*
|
||||||
* @throws IOException in case of an error
|
* @throws IOException in case of an error
|
||||||
*/
|
*/
|
||||||
private void blockReport_04(boolean splitBlockReports) throws IOException {
|
@Test(timeout=300000)
|
||||||
|
public void blockReport_04() throws IOException {
|
||||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
||||||
DFSTestUtil.createFile(fs, filePath,
|
DFSTestUtil.createFile(fs, filePath,
|
||||||
|
@ -459,7 +370,7 @@ public class TestBlockReport {
|
||||||
|
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
|
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
|
||||||
sendBlockReports(dnR, poolId, reports, splitBlockReports);
|
sendBlockReports(dnR, poolId, reports);
|
||||||
printStats();
|
printStats();
|
||||||
|
|
||||||
assertThat("Wrong number of corrupt blocks",
|
assertThat("Wrong number of corrupt blocks",
|
||||||
|
@ -476,7 +387,8 @@ public class TestBlockReport {
|
||||||
*
|
*
|
||||||
* @throws IOException in case of an error
|
* @throws IOException in case of an error
|
||||||
*/
|
*/
|
||||||
private void blockReport_06(boolean splitBlockReports) throws Exception {
|
@Test(timeout=300000)
|
||||||
|
public void blockReport_06() throws Exception {
|
||||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
||||||
final int DN_N1 = DN_N0 + 1;
|
final int DN_N1 = DN_N0 + 1;
|
||||||
|
@ -489,7 +401,7 @@ public class TestBlockReport {
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
|
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
|
||||||
sendBlockReports(dnR, poolId, reports, splitBlockReports);
|
sendBlockReports(dnR, poolId, reports);
|
||||||
printStats();
|
printStats();
|
||||||
assertEquals("Wrong number of PendingReplication Blocks",
|
assertEquals("Wrong number of PendingReplication Blocks",
|
||||||
0, cluster.getNamesystem().getUnderReplicatedBlocks());
|
0, cluster.getNamesystem().getUnderReplicatedBlocks());
|
||||||
|
@ -508,7 +420,8 @@ public class TestBlockReport {
|
||||||
*
|
*
|
||||||
* @throws IOException in case of an error
|
* @throws IOException in case of an error
|
||||||
*/
|
*/
|
||||||
private void blockReport_07(boolean splitBlockReports) throws Exception {
|
@Test(timeout=300000)
|
||||||
|
public void blockReport_07() throws Exception {
|
||||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
||||||
final int DN_N1 = DN_N0 + 1;
|
final int DN_N1 = DN_N0 + 1;
|
||||||
|
@ -522,7 +435,7 @@ public class TestBlockReport {
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
|
StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
|
||||||
sendBlockReports(dnR, poolId, reports, splitBlockReports);
|
sendBlockReports(dnR, poolId, reports);
|
||||||
printStats();
|
printStats();
|
||||||
|
|
||||||
assertThat("Wrong number of corrupt blocks",
|
assertThat("Wrong number of corrupt blocks",
|
||||||
|
@ -533,7 +446,7 @@ public class TestBlockReport {
|
||||||
cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));
|
cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));
|
||||||
|
|
||||||
reports = getBlockReports(dn, poolId, true, true);
|
reports = getBlockReports(dn, poolId, true, true);
|
||||||
sendBlockReports(dnR, poolId, reports, splitBlockReports);
|
sendBlockReports(dnR, poolId, reports);
|
||||||
printStats();
|
printStats();
|
||||||
|
|
||||||
assertThat("Wrong number of corrupt blocks",
|
assertThat("Wrong number of corrupt blocks",
|
||||||
|
@ -559,7 +472,8 @@ public class TestBlockReport {
|
||||||
*
|
*
|
||||||
* @throws IOException in case of an error
|
* @throws IOException in case of an error
|
||||||
*/
|
*/
|
||||||
private void blockReport_08(boolean splitBlockReports) throws IOException {
|
@Test(timeout=300000)
|
||||||
|
public void blockReport_08() throws IOException {
|
||||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
||||||
final int DN_N1 = DN_N0 + 1;
|
final int DN_N1 = DN_N0 + 1;
|
||||||
|
@ -584,7 +498,7 @@ public class TestBlockReport {
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
|
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
|
||||||
sendBlockReports(dnR, poolId, reports, splitBlockReports);
|
sendBlockReports(dnR, poolId, reports);
|
||||||
printStats();
|
printStats();
|
||||||
assertEquals("Wrong number of PendingReplication blocks",
|
assertEquals("Wrong number of PendingReplication blocks",
|
||||||
blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks());
|
blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks());
|
||||||
|
@ -600,7 +514,8 @@ public class TestBlockReport {
|
||||||
// Similar to BlockReport_08 but corrupts GS and len of the TEMPORARY's
|
// Similar to BlockReport_08 but corrupts GS and len of the TEMPORARY's
|
||||||
// replica block. Expect the same behaviour: NN should simply ignore this
|
// replica block. Expect the same behaviour: NN should simply ignore this
|
||||||
// block
|
// block
|
||||||
private void blockReport_09(boolean splitBlockReports) throws IOException {
|
@Test(timeout=300000)
|
||||||
|
public void blockReport_09() throws IOException {
|
||||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
||||||
final int DN_N1 = DN_N0 + 1;
|
final int DN_N1 = DN_N0 + 1;
|
||||||
|
@ -626,7 +541,7 @@ public class TestBlockReport {
|
||||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||||
StorageBlockReport[] reports = getBlockReports(dn, poolId, true, true);
|
StorageBlockReport[] reports = getBlockReports(dn, poolId, true, true);
|
||||||
sendBlockReports(dnR, poolId, reports, splitBlockReports);
|
sendBlockReports(dnR, poolId, reports);
|
||||||
printStats();
|
printStats();
|
||||||
assertEquals("Wrong number of PendingReplication blocks",
|
assertEquals("Wrong number of PendingReplication blocks",
|
||||||
2, cluster.getNamesystem().getPendingReplicationBlocks());
|
2, cluster.getNamesystem().getPendingReplicationBlocks());
|
||||||
|
@ -648,7 +563,7 @@ public class TestBlockReport {
|
||||||
* corrupt.
|
* corrupt.
|
||||||
* This is a regression test for HDFS-2791.
|
* This is a regression test for HDFS-2791.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout=300000)
|
||||||
public void testOneReplicaRbwReportArrivesAfterBlockCompleted() throws Exception {
|
public void testOneReplicaRbwReportArrivesAfterBlockCompleted() throws Exception {
|
||||||
final CountDownLatch brFinished = new CountDownLatch(1);
|
final CountDownLatch brFinished = new CountDownLatch(1);
|
||||||
DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG) {
|
DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG) {
|
||||||
|
@ -898,7 +813,7 @@ public class TestBlockReport {
|
||||||
((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
|
((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
|
||||||
((Log4JLogger) LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
|
((Log4JLogger) LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
|
||||||
((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL);
|
((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL);
|
||||||
((Log4JLogger) TestBlockReport.LOG).getLogger().setLevel(Level.ALL);
|
((Log4JLogger) BlockReportTestBase.LOG).getLogger().setLevel(Level.ALL);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Block findBlock(Path path, long size) throws IOException {
|
private Block findBlock(Path path, long size) throws IOException {
|
||||||
|
@ -933,13 +848,4 @@ public class TestBlockReport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void resetConfiguration() {
|
|
||||||
conf = new Configuration();
|
|
||||||
int customPerChecksumSize = 512;
|
|
||||||
int customBlockSize = customPerChecksumSize * 3;
|
|
||||||
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
|
|
||||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
|
|
||||||
conf.setLong(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, DN_RESCAN_INTERVAL);
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -0,0 +1,205 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.*;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import static org.hamcrest.core.Is.is;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
import static org.mockito.Matchers.*;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that the DataNode respects
|
||||||
|
* {@link DFSConfigKeys#DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY}
|
||||||
|
*/
|
||||||
|
public class TestDnRespectsBlockReportSplitThreshold {
|
||||||
|
public static final Log LOG = LogFactory.getLog(TestStorageReport.class);
|
||||||
|
|
||||||
|
private static final int BLOCK_SIZE = 1024;
|
||||||
|
private static final short REPL_FACTOR = 1;
|
||||||
|
private static final long seed = 0xFEEDFACE;
|
||||||
|
private static final int BLOCKS_IN_FILE = 5;
|
||||||
|
|
||||||
|
private static Configuration conf;
|
||||||
|
private MiniDFSCluster cluster;
|
||||||
|
private DistributedFileSystem fs;
|
||||||
|
static String bpid;
|
||||||
|
|
||||||
|
public void startUpCluster(long splitThreshold) throws IOException {
|
||||||
|
conf = new HdfsConfiguration();
|
||||||
|
conf.setLong(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, splitThreshold);
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(REPL_FACTOR)
|
||||||
|
.build();
|
||||||
|
fs = cluster.getFileSystem();
|
||||||
|
bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void shutDownCluster() throws IOException {
|
||||||
|
if (cluster != null) {
|
||||||
|
fs.close();
|
||||||
|
cluster.shutdown();
|
||||||
|
cluster = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createFile(String filenamePrefix, int blockCount)
|
||||||
|
throws IOException {
|
||||||
|
Path path = new Path("/" + filenamePrefix + ".dat");
|
||||||
|
DFSTestUtil.createFile(fs, path, BLOCK_SIZE,
|
||||||
|
blockCount * BLOCK_SIZE, BLOCK_SIZE, REPL_FACTOR, seed);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyCapturedArguments(
|
||||||
|
ArgumentCaptor<StorageBlockReport[]> captor,
|
||||||
|
int expectedReportsPerCall,
|
||||||
|
int expectedTotalBlockCount) {
|
||||||
|
|
||||||
|
List<StorageBlockReport[]> listOfReports = captor.getAllValues();
|
||||||
|
int numBlocksReported = 0;
|
||||||
|
for (StorageBlockReport[] reports : listOfReports) {
|
||||||
|
assertThat(reports.length, is(expectedReportsPerCall));
|
||||||
|
|
||||||
|
for (StorageBlockReport report : reports) {
|
||||||
|
BlockListAsLongs blockList = new BlockListAsLongs(report.getBlocks());
|
||||||
|
numBlocksReported += blockList.getNumberOfBlocks();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(numBlocksReported >= expectedTotalBlockCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that if splitThreshold is zero, then we always get a separate
|
||||||
|
* call per storage.
|
||||||
|
*/
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testAlwaysSplit() throws IOException, InterruptedException {
|
||||||
|
startUpCluster(0);
|
||||||
|
NameNode nn = cluster.getNameNode();
|
||||||
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
|
|
||||||
|
// Create a file with a few blocks.
|
||||||
|
createFile(GenericTestUtils.getMethodName(), BLOCKS_IN_FILE);
|
||||||
|
|
||||||
|
// Insert a spy object for the NN RPC.
|
||||||
|
DatanodeProtocolClientSideTranslatorPB nnSpy =
|
||||||
|
DataNodeTestUtils.spyOnBposToNN(dn, nn);
|
||||||
|
|
||||||
|
// Trigger a block report so there is an interaction with the spy
|
||||||
|
// object.
|
||||||
|
DataNodeTestUtils.triggerBlockReport(dn);
|
||||||
|
|
||||||
|
ArgumentCaptor<StorageBlockReport[]> captor =
|
||||||
|
ArgumentCaptor.forClass(StorageBlockReport[].class);
|
||||||
|
|
||||||
|
Mockito.verify(nnSpy, times(MiniDFSCluster.DIRS_PER_DATANODE)).blockReport(
|
||||||
|
any(DatanodeRegistration.class),
|
||||||
|
anyString(),
|
||||||
|
captor.capture());
|
||||||
|
|
||||||
|
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the behavior when the count of blocks is exactly one less than
|
||||||
|
* the threshold.
|
||||||
|
*/
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testCornerCaseUnderThreshold() throws IOException, InterruptedException {
|
||||||
|
startUpCluster(BLOCKS_IN_FILE + 1);
|
||||||
|
NameNode nn = cluster.getNameNode();
|
||||||
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
|
|
||||||
|
// Create a file with a few blocks.
|
||||||
|
createFile(GenericTestUtils.getMethodName(), BLOCKS_IN_FILE);
|
||||||
|
|
||||||
|
// Insert a spy object for the NN RPC.
|
||||||
|
DatanodeProtocolClientSideTranslatorPB nnSpy =
|
||||||
|
DataNodeTestUtils.spyOnBposToNN(dn, nn);
|
||||||
|
|
||||||
|
// Trigger a block report so there is an interaction with the spy
|
||||||
|
// object.
|
||||||
|
DataNodeTestUtils.triggerBlockReport(dn);
|
||||||
|
|
||||||
|
ArgumentCaptor<StorageBlockReport[]> captor =
|
||||||
|
ArgumentCaptor.forClass(StorageBlockReport[].class);
|
||||||
|
|
||||||
|
Mockito.verify(nnSpy, times(1)).blockReport(
|
||||||
|
any(DatanodeRegistration.class),
|
||||||
|
anyString(),
|
||||||
|
captor.capture());
|
||||||
|
|
||||||
|
verifyCapturedArguments(captor, MiniDFSCluster.DIRS_PER_DATANODE, BLOCKS_IN_FILE);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the behavior when the count of blocks is exactly equal to the
|
||||||
|
* threshold.
|
||||||
|
*/
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testCornerCaseAtThreshold() throws IOException, InterruptedException {
|
||||||
|
startUpCluster(BLOCKS_IN_FILE);
|
||||||
|
NameNode nn = cluster.getNameNode();
|
||||||
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
|
|
||||||
|
// Create a file with a few blocks.
|
||||||
|
createFile(GenericTestUtils.getMethodName(), BLOCKS_IN_FILE);
|
||||||
|
|
||||||
|
// Insert a spy object for the NN RPC.
|
||||||
|
DatanodeProtocolClientSideTranslatorPB nnSpy =
|
||||||
|
DataNodeTestUtils.spyOnBposToNN(dn, nn);
|
||||||
|
|
||||||
|
// Trigger a block report so there is an interaction with the spy
|
||||||
|
// object.
|
||||||
|
DataNodeTestUtils.triggerBlockReport(dn);
|
||||||
|
|
||||||
|
ArgumentCaptor<StorageBlockReport[]> captor =
|
||||||
|
ArgumentCaptor.forClass(StorageBlockReport[].class);
|
||||||
|
|
||||||
|
Mockito.verify(nnSpy, times(MiniDFSCluster.DIRS_PER_DATANODE)).blockReport(
|
||||||
|
any(DatanodeRegistration.class),
|
||||||
|
anyString(),
|
||||||
|
captor.capture());
|
||||||
|
|
||||||
|
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,42 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs all tests in BlockReportTestBase, sending one block per storage.
|
||||||
|
* This is the default DataNode behavior post HDFS-2832.
|
||||||
|
*/
|
||||||
|
public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
|
||||||
|
StorageBlockReport[] reports) throws IOException {
|
||||||
|
for (StorageBlockReport report : reports) {
|
||||||
|
LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
|
||||||
|
StorageBlockReport[] singletonReport = { report };
|
||||||
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,39 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs all tests in BlockReportTestBase, sending one block report
|
||||||
|
* per DataNode. This tests that the NN can handle the legacy DN
|
||||||
|
* behavior where it presents itself as a single logical storage.
|
||||||
|
*/
|
||||||
|
public class TestNNHandlesCombinedBlockReport extends BlockReportTestBase {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
|
||||||
|
StorageBlockReport[] reports) throws IOException {
|
||||||
|
LOG.info("Sending combined block reports for " + dnR);
|
||||||
|
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue