HDFS-9371. Code cleanup for DatanodeManager. Contributed by Jing Zhao.

This commit is contained in:
Jing Zhao 2015-12-15 10:47:53 -08:00
parent 0c3a53e5a9
commit 8602692338
6 changed files with 229 additions and 238 deletions

View File

@ -899,6 +899,8 @@ Release 2.9.0 - UNRELEASED
HDFS-9281. Change TestDeleteBlockPool to not explicitly use File to check
block pool existence. (lei)
HDFS-9371. Code cleanup for DatanodeManager. (jing9)
OPTIMIZATIONS
BUG FIXES

View File

@ -132,6 +132,9 @@ public class BlockManager implements BlockStatsMXBean {
private final HeartbeatManager heartbeatManager;
private final BlockTokenSecretManager blockTokenSecretManager;
// Block pool ID used by this namenode
private String blockPoolId;
private final PendingDataNodeMessages pendingDNMessages =
new PendingDataNodeMessages();
@ -462,11 +465,16 @@ public BlockStoragePolicy[] getStoragePolicies() {
}
public void setBlockPoolId(String blockPoolId) {
this.blockPoolId = blockPoolId;
if (isBlockTokenEnabled()) {
blockTokenSecretManager.setBlockPoolId(blockPoolId);
}
}
public String getBlockPoolId() {
return blockPoolId;
}
public BlockStoragePolicySuite getStoragePolicySuite() {
return storagePolicySuite;
}
@ -1228,18 +1236,6 @@ private Block getBlockOnStorage(BlockInfo storedBlock,
((BlockInfoStriped) storedBlock).getBlockOnStorage(storage) : storedBlock;
}
/**
* Remove all block invalidation tasks under this datanode UUID;
* used when a datanode registers with a new UUID and the old one
* is wiped.
*/
void removeFromInvalidates(final DatanodeInfo datanode) {
if (!isPopulatingReplQueues()) {
return;
}
invalidateBlocks.remove(datanode);
}
/**
* Mark the block belonging to datanode as corrupt
* @param blk Block to be marked as corrupt

View File

@ -290,11 +290,11 @@ public void setAlive(boolean isAlive) {
this.isAlive = isAlive;
}
public boolean needKeyUpdate() {
public synchronized boolean needKeyUpdate() {
return needKeyUpdate;
}
public void setNeedKeyUpdate(boolean needKeyUpdate) {
public synchronized void setNeedKeyUpdate(boolean needKeyUpdate) {
this.needKeyUpdate = needKeyUpdate;
}
@ -868,14 +868,14 @@ public void updateRegInfo(DatanodeID nodeReg) {
/**
* @return balancer bandwidth in bytes per second for this datanode
*/
public long getBalancerBandwidth() {
public synchronized long getBalancerBandwidth() {
return this.bandwidth;
}
/**
* @param bandwidth balancer bandwidth in bytes per second for this datanode
*/
public void setBalancerBandwidth(long bandwidth) {
public synchronized void setBalancerBandwidth(long bandwidth) {
this.bandwidth = bandwidth;
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY;
import static org.apache.hadoop.util.Time.monotonicNow;
import com.google.common.annotations.VisibleForTesting;
@ -162,7 +163,7 @@ public class DatanodeManager {
* during rolling upgrades.
* Software version -> Number of datanodes with this version
*/
private HashMap<String, Integer> datanodesSoftwareVersions =
private final HashMap<String, Integer> datanodesSoftwareVersions =
new HashMap<>(4, 0.75f);
/**
@ -352,15 +353,9 @@ public DatanodeStatistics getDatanodeStatistics() {
}
private boolean isInactive(DatanodeInfo datanode) {
if (datanode.isDecommissioned()) {
return true;
}
return datanode.isDecommissioned() ||
(avoidStaleDataNodesForRead && datanode.isStale(staleInterval));
if (avoidStaleDataNodesForRead) {
return datanode.isStale(staleInterval);
}
return false;
}
/** Sort the located blocks by the distance to the target host. */
@ -479,8 +474,9 @@ DatanodeDescriptor getDatanode(final String datanodeUuid) {
if (datanodeUuid == null) {
return null;
}
return datanodeMap.get(datanodeUuid);
synchronized (this) {
return datanodeMap.get(datanodeUuid);
}
}
/**
@ -490,8 +486,8 @@ DatanodeDescriptor getDatanode(final String datanodeUuid) {
* @return DatanodeDescriptor or null if the node is not found.
* @throws UnregisteredNodeException
*/
public DatanodeDescriptor getDatanode(DatanodeID nodeID
) throws UnregisteredNodeException {
public DatanodeDescriptor getDatanode(DatanodeID nodeID)
throws UnregisteredNodeException {
final DatanodeDescriptor node = getDatanode(nodeID.getDatanodeUuid());
if (node == null)
return null;
@ -535,13 +531,13 @@ public DatanodeStorageInfo[] getDatanodeStorageInfos(
/** Prints information about all datanodes. */
void datanodeDump(final PrintWriter out) {
synchronized (datanodeMap) {
Map<String,DatanodeDescriptor> sortedDatanodeMap =
new TreeMap<>(datanodeMap);
out.println("Metasave: Number of datanodes: " + datanodeMap.size());
for (DatanodeDescriptor node : sortedDatanodeMap.values()) {
out.println(node.dumpDatanode());
}
final Map<String,DatanodeDescriptor> sortedDatanodeMap;
synchronized (this) {
sortedDatanodeMap = new TreeMap<>(datanodeMap);
}
out.println("Metasave: Number of datanodes: " + sortedDatanodeMap.size());
for (DatanodeDescriptor node : sortedDatanodeMap.values()) {
out.println(node.dumpDatanode());
}
}
@ -567,8 +563,8 @@ private void removeDatanode(DatanodeDescriptor nodeInfo) {
* Remove a datanode
* @throws UnregisteredNodeException
*/
public void removeDatanode(final DatanodeID node
) throws UnregisteredNodeException {
public void removeDatanode(final DatanodeID node)
throws UnregisteredNodeException {
namesystem.writeLock();
try {
final DatanodeDescriptor descriptor = getDatanode(node);
@ -585,19 +581,17 @@ public void removeDatanode(final DatanodeID node
/** Remove a dead datanode. */
void removeDeadDatanode(final DatanodeID nodeID) {
synchronized(datanodeMap) {
DatanodeDescriptor d;
try {
d = getDatanode(nodeID);
} catch(IOException e) {
d = null;
}
if (d != null && isDatanodeDead(d)) {
NameNode.stateChangeLog.info(
"BLOCK* removeDeadDatanode: lost heartbeat from " + d);
removeDatanode(d);
}
}
DatanodeDescriptor d;
try {
d = getDatanode(nodeID);
} catch(IOException e) {
d = null;
}
if (d != null && isDatanodeDead(d)) {
NameNode.stateChangeLog.info(
"BLOCK* removeDeadDatanode: lost heartbeat from " + d);
removeDatanode(d);
}
}
/** Is the datanode dead? */
@ -611,14 +605,13 @@ void addDatanode(final DatanodeDescriptor node) {
// To keep host2DatanodeMap consistent with datanodeMap,
// remove from host2DatanodeMap the datanodeDescriptor removed
// from datanodeMap before adding node to host2DatanodeMap.
synchronized(datanodeMap) {
synchronized(this) {
host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
}
networktopology.add(node); // may throw InvalidTopologyException
host2DatanodeMap.add(node);
checkIfClusterIsNowMultiRack(node);
blockManager.getBlockReportLeaseManager().register(node);
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".addDatanode: "
@ -629,11 +622,9 @@ void addDatanode(final DatanodeDescriptor node) {
/** Physically remove node from datanodeMap. */
private void wipeDatanode(final DatanodeID node) {
final String key = node.getDatanodeUuid();
synchronized (datanodeMap) {
synchronized (this) {
host2DatanodeMap.remove(datanodeMap.remove(key));
}
// Also remove all block invalidation tasks under this node
blockManager.removeFromInvalidates(new DatanodeInfo(node));
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".wipeDatanode("
+ node + "): storage " + key
@ -645,7 +636,7 @@ private void incrementVersionCount(String version) {
if (version == null) {
return;
}
synchronized(datanodeMap) {
synchronized(this) {
Integer count = this.datanodesSoftwareVersions.get(version);
count = count == null ? 1 : count + 1;
this.datanodesSoftwareVersions.put(version, count);
@ -656,7 +647,7 @@ private void decrementVersionCount(String version) {
if (version == null) {
return;
}
synchronized(datanodeMap) {
synchronized(this) {
Integer count = this.datanodesSoftwareVersions.get(version);
if(count != null) {
if(count > 1) {
@ -674,24 +665,22 @@ private boolean shouldCountVersion(DatanodeDescriptor node) {
}
private void countSoftwareVersions() {
synchronized(datanodeMap) {
HashMap<String, Integer> versionCount = new HashMap<>();
synchronized(this) {
datanodesSoftwareVersions.clear();
for(DatanodeDescriptor dn: datanodeMap.values()) {
// Check isAlive too because right after removeDatanode(),
// isDatanodeDead() is still true
if(shouldCountVersion(dn))
{
Integer num = versionCount.get(dn.getSoftwareVersion());
if (shouldCountVersion(dn)) {
Integer num = datanodesSoftwareVersions.get(dn.getSoftwareVersion());
num = num == null ? 1 : num+1;
versionCount.put(dn.getSoftwareVersion(), num);
datanodesSoftwareVersions.put(dn.getSoftwareVersion(), num);
}
}
this.datanodesSoftwareVersions = versionCount;
}
}
public HashMap<String, Integer> getDatanodesSoftwareVersions() {
synchronized(datanodeMap) {
synchronized(this) {
return new HashMap<> (this.datanodesSoftwareVersions);
}
}
@ -747,13 +736,11 @@ private String resolveNetworkLocation (DatanodeID node)
/**
* Resolve network locations for specified hosts
*
* @param names
* @return Network locations if available, Else returns null
*/
public List<String> resolveNetworkLocation(List<String> names) {
// resolve its network location
List<String> rName = dnsToSwitchMapping.resolve(names);
return rName;
return dnsToSwitchMapping.resolve(names);
}
/**
@ -807,10 +794,9 @@ private List<String> getNetworkDependencies(DatanodeInfo node)
* This is used to not to display a decommissioned datanode to the operators.
* @param nodeList , array list of live or dead nodes.
*/
private void removeDecomNodeFromList(
private static void removeDecomNodeFromList(
final List<DatanodeDescriptor> nodeList) {
Iterator<DatanodeDescriptor> it=null;
for (it = nodeList.iterator(); it.hasNext();) {
for (Iterator<DatanodeDescriptor> it = nodeList.iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
if (node.isDecommissioned()) {
it.remove();
@ -968,6 +954,7 @@ nodes with its data cleared (or user can just remove the StorageID
// register new datanode
addDatanode(nodeDescr);
blockManager.getBlockReportLeaseManager().register(nodeDescr);
// also treat the registration message as a heartbeat
// no need to update its timestamp
// because its is done when the descriptor is created
@ -1030,7 +1017,11 @@ private void refreshHostsReader(Configuration conf) throws IOException {
* 4. Removed from exclude --> stop decommission.
*/
private void refreshDatanodes() {
for(DatanodeDescriptor node : datanodeMap.values()) {
final Map<String, DatanodeDescriptor> copy;
synchronized (this) {
copy = new HashMap<>(datanodeMap);
}
for (DatanodeDescriptor node : copy.values()) {
// Check if not include.
if (!hostFileManager.isIncluded(node)) {
node.setDisallowed(true); // case 2.
@ -1047,7 +1038,7 @@ private void refreshDatanodes() {
/** @return the number of live datanodes. */
public int getNumLiveDataNodes() {
int numLive = 0;
synchronized (datanodeMap) {
synchronized (this) {
for(DatanodeDescriptor dn : datanodeMap.values()) {
if (!isDatanodeDead(dn) ) {
numLive++;
@ -1252,7 +1243,7 @@ public List<DatanodeDescriptor> getDatanodeListForReport(
final HostFileManager.HostSet includedNodes = hostFileManager.getIncludes();
final HostFileManager.HostSet excludedNodes = hostFileManager.getExcludes();
synchronized(datanodeMap) {
synchronized(this) {
nodes = new ArrayList<>(datanodeMap.size());
for (DatanodeDescriptor dn : datanodeMap.values()) {
final boolean isDead = isDatanodeDead(dn);
@ -1327,155 +1318,160 @@ private void setDatanodeDead(DatanodeDescriptor node) {
node.setLastUpdateMonotonic(0);
}
private BlockRecoveryCommand getBlockRecoveryCommand(String blockPoolId,
DatanodeDescriptor nodeinfo) {
BlockInfo[] blocks = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
if (blocks == null) {
return null;
}
BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.length);
for (BlockInfo b : blocks) {
BlockUnderConstructionFeature uc = b.getUnderConstructionFeature();
assert uc != null;
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
// Skip stale nodes during recovery
final List<DatanodeStorageInfo> recoveryLocations =
new ArrayList<>(storages.length);
for (DatanodeStorageInfo storage : storages) {
if (!storage.getDatanodeDescriptor().isStale(staleInterval)) {
recoveryLocations.add(storage);
}
}
// If we are performing a truncate recovery than set recovery fields
// to old block.
boolean truncateRecovery = uc.getTruncateBlock() != null;
boolean copyOnTruncateRecovery = truncateRecovery &&
uc.getTruncateBlock().getBlockId() != b.getBlockId();
ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ?
new ExtendedBlock(blockPoolId, uc.getTruncateBlock()) :
new ExtendedBlock(blockPoolId, b);
// If we only get 1 replica after eliminating stale nodes, choose all
// replicas for recovery and let the primary data node handle failures.
DatanodeInfo[] recoveryInfos;
if (recoveryLocations.size() > 1) {
if (recoveryLocations.size() != storages.length) {
LOG.info("Skipped stale nodes for recovery : "
+ (storages.length - recoveryLocations.size()));
}
recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(recoveryLocations);
} else {
// If too many replicas are stale, then choose all replicas to
// participate in block recovery.
recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages);
}
RecoveringBlock rBlock;
if (truncateRecovery) {
Block recoveryBlock = (copyOnTruncateRecovery) ? b : uc.getTruncateBlock();
rBlock = new RecoveringBlock(primaryBlock, recoveryInfos, recoveryBlock);
} else {
rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
uc.getBlockRecoveryId());
}
brCommand.add(rBlock);
}
return brCommand;
}
private void addCacheCommands(String blockPoolId, DatanodeDescriptor nodeinfo,
List<DatanodeCommand> cmds) {
boolean sendingCachingCommands = false;
final long nowMs = monotonicNow();
if (shouldSendCachingCommands &&
((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
timeBetweenResendingCachingDirectivesMs)) {
DatanodeCommand pendingCacheCommand = getCacheCommand(
nodeinfo.getPendingCached(), DatanodeProtocol.DNA_CACHE,
blockPoolId);
if (pendingCacheCommand != null) {
cmds.add(pendingCacheCommand);
sendingCachingCommands = true;
}
DatanodeCommand pendingUncacheCommand = getCacheCommand(
nodeinfo.getPendingUncached(), DatanodeProtocol.DNA_UNCACHE,
blockPoolId);
if (pendingUncacheCommand != null) {
cmds.add(pendingUncacheCommand);
sendingCachingCommands = true;
}
if (sendingCachingCommands) {
nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
}
}
}
/** Handle heartbeat from datanodes. */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, final String blockPoolId,
long cacheCapacity, long cacheUsed, int xceiverCount,
int maxTransfers, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
synchronized (heartbeatManager) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo;
try {
nodeinfo = getDatanode(nodeReg);
} catch(UnregisteredNodeException e) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
// Check if this datanode should actually be shutdown instead.
if (nodeinfo != null && nodeinfo.isDisallowed()) {
setDatanodeDead(nodeinfo);
throw new DisallowedDatanodeException(nodeinfo);
}
final DatanodeDescriptor nodeinfo;
try {
nodeinfo = getDatanode(nodeReg);
} catch (UnregisteredNodeException e) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
if (nodeinfo == null || !nodeinfo.isAlive()) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
// Check if this datanode should actually be shutdown instead.
if (nodeinfo != null && nodeinfo.isDisallowed()) {
setDatanodeDead(nodeinfo);
throw new DisallowedDatanodeException(nodeinfo);
}
heartbeatManager.updateHeartbeat(nodeinfo, reports,
cacheCapacity, cacheUsed,
xceiverCount, failedVolumes,
volumeFailureSummary);
if (nodeinfo == null || !nodeinfo.isAlive()) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);
// If we are in safemode, do not send back any recovery / replication
// requests. Don't even drain the existing queue of work.
if(namesystem.isInSafeMode()) {
return new DatanodeCommand[0];
}
// If we are in safemode, do not send back any recovery / replication
// requests. Don't even drain the existing queue of work.
if (namesystem.isInSafeMode()) {
return new DatanodeCommand[0];
}
//check lease recovery
BlockInfo[] blocks = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
if (blocks != null) {
BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
blocks.length);
for (BlockInfo b : blocks) {
BlockUnderConstructionFeature uc = b.getUnderConstructionFeature();
assert uc != null;
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
// Skip stale nodes during recovery - not heart beated for some time (30s by default).
final List<DatanodeStorageInfo> recoveryLocations =
new ArrayList<>(storages.length);
for (DatanodeStorageInfo storage : storages) {
if (!storage.getDatanodeDescriptor().isStale(staleInterval)) {
recoveryLocations.add(storage);
}
}
// If we are performing a truncate recovery than set recovery fields
// to old block.
boolean truncateRecovery = uc.getTruncateBlock() != null;
boolean copyOnTruncateRecovery = truncateRecovery &&
uc.getTruncateBlock().getBlockId() != b.getBlockId();
ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ?
new ExtendedBlock(blockPoolId, uc.getTruncateBlock()) :
new ExtendedBlock(blockPoolId, b);
// If we only get 1 replica after eliminating stale nodes, then choose all
// replicas for recovery and let the primary data node handle failures.
DatanodeInfo[] recoveryInfos;
if (recoveryLocations.size() > 1) {
if (recoveryLocations.size() != storages.length) {
LOG.info("Skipped stale nodes for recovery : " +
(storages.length - recoveryLocations.size()));
}
recoveryInfos =
DatanodeStorageInfo.toDatanodeInfos(recoveryLocations);
} else {
// If too many replicas are stale, then choose all replicas to participate
// in block recovery.
recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages);
}
RecoveringBlock rBlock;
if(truncateRecovery) {
Block recoveryBlock = (copyOnTruncateRecovery) ? b :
uc.getTruncateBlock();
rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
recoveryBlock);
} else {
rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
uc.getBlockRecoveryId());
}
brCommand.add(rBlock);
}
return new DatanodeCommand[] { brCommand };
}
// block recovery command
final BlockRecoveryCommand brCommand = getBlockRecoveryCommand(blockPoolId,
nodeinfo);
if (brCommand != null) {
return new DatanodeCommand[]{brCommand};
}
final List<DatanodeCommand> cmds = new ArrayList<>();
//check pending replication
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
maxTransfers);
if (pendingList != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
pendingList));
}
// checking pending erasure coding tasks
List<BlockECRecoveryInfo> pendingECList =
nodeinfo.getErasureCodeCommand(maxTransfers);
if (pendingECList != null) {
cmds.add(new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY,
pendingECList));
}
//check block invalidation
Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
if (blks != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
blockPoolId, blks));
}
boolean sendingCachingCommands = false;
long nowMs = monotonicNow();
if (shouldSendCachingCommands &&
((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
timeBetweenResendingCachingDirectivesMs)) {
DatanodeCommand pendingCacheCommand =
getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
DatanodeProtocol.DNA_CACHE, blockPoolId);
if (pendingCacheCommand != null) {
cmds.add(pendingCacheCommand);
sendingCachingCommands = true;
}
DatanodeCommand pendingUncacheCommand =
getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
DatanodeProtocol.DNA_UNCACHE, blockPoolId);
if (pendingUncacheCommand != null) {
cmds.add(pendingUncacheCommand);
sendingCachingCommands = true;
}
if (sendingCachingCommands) {
nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
}
}
final List<DatanodeCommand> cmds = new ArrayList<>();
// check pending replication
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
maxTransfers);
if (pendingList != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
pendingList));
}
// check pending erasure coding tasks
List<BlockECRecoveryInfo> pendingECList = nodeinfo.getErasureCodeCommand(
maxTransfers);
if (pendingECList != null) {
cmds.add(new BlockECRecoveryCommand(DNA_ERASURE_CODING_RECOVERY,
pendingECList));
}
// check block invalidation
Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
if (blks != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId,
blks));
}
// cache commands
addCacheCommands(blockPoolId, nodeinfo, cmds);
// key update command
blockManager.addKeyUpdateCommand(cmds, nodeinfo);
blockManager.addKeyUpdateCommand(cmds, nodeinfo);
// check for balancer bandwidth update
if (nodeinfo.getBalancerBandwidth() > 0) {
cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
// set back to 0 to indicate that datanode has been sent the new value
nodeinfo.setBalancerBandwidth(0);
}
// check for balancer bandwidth update
if (nodeinfo.getBalancerBandwidth() > 0) {
cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
// set back to 0 to indicate that datanode has been sent the new value
nodeinfo.setBalancerBandwidth(0);
}
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
}
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
return new DatanodeCommand[0];
@ -1486,14 +1482,13 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
*
* @param list The {@link CachedBlocksList}. This function
* clears the list.
* @param datanode The datanode.
* @param action The action to perform in the command.
* @param poolId The block pool id.
* @return A DatanodeCommand to be sent back to the DN, or null if
* there is nothing to be done.
*/
private DatanodeCommand getCacheCommand(CachedBlocksList list,
DatanodeDescriptor datanode, int action, String poolId) {
private DatanodeCommand getCacheCommand(CachedBlocksList list, int action,
String poolId) {
int length = list.size();
if (length == 0) {
return null;
@ -1501,9 +1496,7 @@ private DatanodeCommand getCacheCommand(CachedBlocksList list,
// Read the existing cache commands.
long[] blockIds = new long[length];
int i = 0;
for (Iterator<CachedBlock> iter = list.iterator();
iter.hasNext(); ) {
CachedBlock cachedBlock = iter.next();
for (CachedBlock cachedBlock : list) {
blockIds[i++] = cachedBlock.getBlockId();
}
return new BlockIdCommand(action, poolId, blockIds);
@ -1524,7 +1517,7 @@ private DatanodeCommand getCacheCommand(CachedBlocksList list,
* @throws IOException
*/
public void setBalancerBandwidth(long bandwidth) throws IOException {
synchronized(datanodeMap) {
synchronized(this) {
for (DatanodeDescriptor nodeInfo : datanodeMap.values()) {
nodeInfo.setBalancerBandwidth(bandwidth);
}
@ -1533,7 +1526,7 @@ public void setBalancerBandwidth(long bandwidth) throws IOException {
public void markAllDatanodesStale() {
LOG.info("Marking all datandoes as stale");
synchronized (datanodeMap) {
synchronized (this) {
for (DatanodeDescriptor dn : datanodeMap.values()) {
for(DatanodeStorageInfo storage : dn.getStorageInfos()) {
storage.markStaleAfterFailover();
@ -1548,7 +1541,7 @@ public void markAllDatanodesStale() {
* recoveries, and replication requests.
*/
public void clearPendingQueues() {
synchronized (datanodeMap) {
synchronized (this) {
for (DatanodeDescriptor dn : datanodeMap.values()) {
dn.clearBlockQueues();
}
@ -1560,7 +1553,7 @@ public void clearPendingQueues() {
* know about.
*/
public void resetLastCachingDirectiveSentTime() {
synchronized (datanodeMap) {
synchronized (this) {
for (DatanodeDescriptor dn : datanodeMap.values()) {
dn.setLastCachingDirectiveSentTimeMs(0L);
}
@ -1573,9 +1566,11 @@ public String toString() {
}
public void clearPendingCachingCommands() {
for (DatanodeDescriptor dn : datanodeMap.values()) {
dn.getPendingCached().clear();
dn.getPendingUncached().clear();
synchronized (this) {
for (DatanodeDescriptor dn : datanodeMap.values()) {
dn.getPendingCached().clear();
dn.getPendingUncached().clear();
}
}
}

View File

@ -27,6 +27,8 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.util.Daemon;

View File

@ -418,9 +418,6 @@ private void logAuditEvent(boolean succeeded,
*/
private volatile boolean needRollbackFsImage;
// Block pool ID used by this namenode
private String blockPoolId;
final LeaseManager leaseManager = new LeaseManager(this);
Daemon nnrmthread = null; // NamenodeResourceMonitor thread
@ -2348,12 +2345,11 @@ LastBlockWithStatus appendFile(String srcArg, String holder,
}
ExtendedBlock getExtendedBlock(Block blk) {
return new ExtendedBlock(blockPoolId, blk);
return new ExtendedBlock(getBlockPoolId(), blk);
}
void setBlockPoolId(String bpid) {
blockPoolId = bpid;
blockManager.setBlockPoolId(blockPoolId);
blockManager.setBlockPoolId(bpid);
}
/**
@ -3489,11 +3485,11 @@ String getRegistrationID() {
* The given node has reported in. This method should:
* 1) Record the heartbeat, so the datanode isn't timed out
* 2) Adjust usage stats for future block allocation
*
* If a substantial amount of time passed since the last datanode
* heartbeat then request an immediate block report.
*
* @return an array of datanode commands
*
* If a substantial amount of time passed since the last datanode
* heartbeat then request an immediate block report.
*
* @return an array of datanode commands
* @throws IOException
*/
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
@ -3507,7 +3503,7 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
final int maxTransfer = blockManager.getMaxReplicationStreams()
- xmitsInProgress;
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,
nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
long blockReportLeaseId = 0;
if (requestFullBlockReportLease) {
@ -5371,7 +5367,7 @@ public String getClusterId() {
@Override // NameNodeMXBean
public String getBlockPoolId() {
return blockPoolId;
return getBlockManager().getBlockPoolId();
}
@Override // NameNodeMXBean
@ -5960,7 +5956,7 @@ private void startRollingUpgradeInternalForNonHA(long startTime)
}
void setRollingUpgradeInfo(boolean createdRollbackImages, long startTime) {
rollingUpgradeInfo = new RollingUpgradeInfo(blockPoolId,
rollingUpgradeInfo = new RollingUpgradeInfo(getBlockPoolId(),
createdRollbackImages, startTime, 0L);
}