From 8602692338d6f493647205e0241e4116211fab75 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Tue, 15 Dec 2015 10:47:53 -0800 Subject: [PATCH] HDFS-9371. Code cleanup for DatanodeManager. Contributed by Jing Zhao. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../server/blockmanagement/BlockManager.java | 20 +- .../blockmanagement/DatanodeDescriptor.java | 8 +- .../blockmanagement/DatanodeManager.java | 411 +++++++++--------- .../blockmanagement/HeartbeatManager.java | 2 + .../hdfs/server/namenode/FSNamesystem.java | 24 +- 6 files changed, 229 insertions(+), 238 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c2f6863bf63..ae0fdc49fe8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -899,6 +899,8 @@ Release 2.9.0 - UNRELEASED HDFS-9281. Change TestDeleteBlockPool to not explicitly use File to check block pool existence. (lei) + HDFS-9371. Code cleanup for DatanodeManager. (jing9) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index ae1238b32d8..929672629b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -132,6 +132,9 @@ public class BlockManager implements BlockStatsMXBean { private final HeartbeatManager heartbeatManager; private final BlockTokenSecretManager blockTokenSecretManager; + // Block pool ID used by this namenode + private String blockPoolId; + private final PendingDataNodeMessages pendingDNMessages = new PendingDataNodeMessages(); @@ -462,11 +465,16 @@ public BlockStoragePolicy[] getStoragePolicies() { } public void setBlockPoolId(String blockPoolId) { + this.blockPoolId = blockPoolId; if (isBlockTokenEnabled()) { blockTokenSecretManager.setBlockPoolId(blockPoolId); } } + public String getBlockPoolId() { + return blockPoolId; + } + public BlockStoragePolicySuite getStoragePolicySuite() { return storagePolicySuite; } @@ -1228,18 +1236,6 @@ private Block getBlockOnStorage(BlockInfo storedBlock, ((BlockInfoStriped) storedBlock).getBlockOnStorage(storage) : storedBlock; } - /** - * Remove all block invalidation tasks under this datanode UUID; - * used when a datanode registers with a new UUID and the old one - * is wiped. - */ - void removeFromInvalidates(final DatanodeInfo datanode) { - if (!isPopulatingReplQueues()) { - return; - } - invalidateBlocks.remove(datanode); - } - /** * Mark the block belonging to datanode as corrupt * @param blk Block to be marked as corrupt diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index e5563eb5cc2..6709390d008 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -290,11 +290,11 @@ public void setAlive(boolean isAlive) { this.isAlive = isAlive; } - public boolean needKeyUpdate() { + public synchronized boolean needKeyUpdate() { return needKeyUpdate; } - public void setNeedKeyUpdate(boolean needKeyUpdate) { + public synchronized void setNeedKeyUpdate(boolean needKeyUpdate) { this.needKeyUpdate = needKeyUpdate; } @@ -868,14 +868,14 @@ public void updateRegInfo(DatanodeID nodeReg) { /** * @return balancer bandwidth in bytes per second for this datanode */ - public long getBalancerBandwidth() { + public synchronized long getBalancerBandwidth() { return this.bandwidth; } /** * @param bandwidth balancer bandwidth in bytes per second for this datanode */ - public void setBalancerBandwidth(long bandwidth) { + public synchronized void setBalancerBandwidth(long bandwidth) { this.bandwidth = bandwidth; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index f758454aa91..d535397de0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import static org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY; import static org.apache.hadoop.util.Time.monotonicNow; import com.google.common.annotations.VisibleForTesting; @@ -162,7 +163,7 @@ public class DatanodeManager { * during rolling upgrades. * Software version -> Number of datanodes with this version */ - private HashMap datanodesSoftwareVersions = + private final HashMap datanodesSoftwareVersions = new HashMap<>(4, 0.75f); /** @@ -352,15 +353,9 @@ public DatanodeStatistics getDatanodeStatistics() { } private boolean isInactive(DatanodeInfo datanode) { - if (datanode.isDecommissioned()) { - return true; - } + return datanode.isDecommissioned() || + (avoidStaleDataNodesForRead && datanode.isStale(staleInterval)); - if (avoidStaleDataNodesForRead) { - return datanode.isStale(staleInterval); - } - - return false; } /** Sort the located blocks by the distance to the target host. */ @@ -479,8 +474,9 @@ DatanodeDescriptor getDatanode(final String datanodeUuid) { if (datanodeUuid == null) { return null; } - - return datanodeMap.get(datanodeUuid); + synchronized (this) { + return datanodeMap.get(datanodeUuid); + } } /** @@ -490,8 +486,8 @@ DatanodeDescriptor getDatanode(final String datanodeUuid) { * @return DatanodeDescriptor or null if the node is not found. * @throws UnregisteredNodeException */ - public DatanodeDescriptor getDatanode(DatanodeID nodeID - ) throws UnregisteredNodeException { + public DatanodeDescriptor getDatanode(DatanodeID nodeID) + throws UnregisteredNodeException { final DatanodeDescriptor node = getDatanode(nodeID.getDatanodeUuid()); if (node == null) return null; @@ -535,13 +531,13 @@ public DatanodeStorageInfo[] getDatanodeStorageInfos( /** Prints information about all datanodes. */ void datanodeDump(final PrintWriter out) { - synchronized (datanodeMap) { - Map sortedDatanodeMap = - new TreeMap<>(datanodeMap); - out.println("Metasave: Number of datanodes: " + datanodeMap.size()); - for (DatanodeDescriptor node : sortedDatanodeMap.values()) { - out.println(node.dumpDatanode()); - } + final Map sortedDatanodeMap; + synchronized (this) { + sortedDatanodeMap = new TreeMap<>(datanodeMap); + } + out.println("Metasave: Number of datanodes: " + sortedDatanodeMap.size()); + for (DatanodeDescriptor node : sortedDatanodeMap.values()) { + out.println(node.dumpDatanode()); } } @@ -567,8 +563,8 @@ private void removeDatanode(DatanodeDescriptor nodeInfo) { * Remove a datanode * @throws UnregisteredNodeException */ - public void removeDatanode(final DatanodeID node - ) throws UnregisteredNodeException { + public void removeDatanode(final DatanodeID node) + throws UnregisteredNodeException { namesystem.writeLock(); try { final DatanodeDescriptor descriptor = getDatanode(node); @@ -585,19 +581,17 @@ public void removeDatanode(final DatanodeID node /** Remove a dead datanode. */ void removeDeadDatanode(final DatanodeID nodeID) { - synchronized(datanodeMap) { - DatanodeDescriptor d; - try { - d = getDatanode(nodeID); - } catch(IOException e) { - d = null; - } - if (d != null && isDatanodeDead(d)) { - NameNode.stateChangeLog.info( - "BLOCK* removeDeadDatanode: lost heartbeat from " + d); - removeDatanode(d); - } - } + DatanodeDescriptor d; + try { + d = getDatanode(nodeID); + } catch(IOException e) { + d = null; + } + if (d != null && isDatanodeDead(d)) { + NameNode.stateChangeLog.info( + "BLOCK* removeDeadDatanode: lost heartbeat from " + d); + removeDatanode(d); + } } /** Is the datanode dead? */ @@ -611,14 +605,13 @@ void addDatanode(final DatanodeDescriptor node) { // To keep host2DatanodeMap consistent with datanodeMap, // remove from host2DatanodeMap the datanodeDescriptor removed // from datanodeMap before adding node to host2DatanodeMap. - synchronized(datanodeMap) { + synchronized(this) { host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node)); } networktopology.add(node); // may throw InvalidTopologyException host2DatanodeMap.add(node); checkIfClusterIsNowMultiRack(node); - blockManager.getBlockReportLeaseManager().register(node); if (LOG.isDebugEnabled()) { LOG.debug(getClass().getSimpleName() + ".addDatanode: " @@ -629,11 +622,9 @@ void addDatanode(final DatanodeDescriptor node) { /** Physically remove node from datanodeMap. */ private void wipeDatanode(final DatanodeID node) { final String key = node.getDatanodeUuid(); - synchronized (datanodeMap) { + synchronized (this) { host2DatanodeMap.remove(datanodeMap.remove(key)); } - // Also remove all block invalidation tasks under this node - blockManager.removeFromInvalidates(new DatanodeInfo(node)); if (LOG.isDebugEnabled()) { LOG.debug(getClass().getSimpleName() + ".wipeDatanode(" + node + "): storage " + key @@ -645,7 +636,7 @@ private void incrementVersionCount(String version) { if (version == null) { return; } - synchronized(datanodeMap) { + synchronized(this) { Integer count = this.datanodesSoftwareVersions.get(version); count = count == null ? 1 : count + 1; this.datanodesSoftwareVersions.put(version, count); @@ -656,7 +647,7 @@ private void decrementVersionCount(String version) { if (version == null) { return; } - synchronized(datanodeMap) { + synchronized(this) { Integer count = this.datanodesSoftwareVersions.get(version); if(count != null) { if(count > 1) { @@ -674,24 +665,22 @@ private boolean shouldCountVersion(DatanodeDescriptor node) { } private void countSoftwareVersions() { - synchronized(datanodeMap) { - HashMap versionCount = new HashMap<>(); + synchronized(this) { + datanodesSoftwareVersions.clear(); for(DatanodeDescriptor dn: datanodeMap.values()) { // Check isAlive too because right after removeDatanode(), // isDatanodeDead() is still true - if(shouldCountVersion(dn)) - { - Integer num = versionCount.get(dn.getSoftwareVersion()); + if (shouldCountVersion(dn)) { + Integer num = datanodesSoftwareVersions.get(dn.getSoftwareVersion()); num = num == null ? 1 : num+1; - versionCount.put(dn.getSoftwareVersion(), num); + datanodesSoftwareVersions.put(dn.getSoftwareVersion(), num); } } - this.datanodesSoftwareVersions = versionCount; } } public HashMap getDatanodesSoftwareVersions() { - synchronized(datanodeMap) { + synchronized(this) { return new HashMap<> (this.datanodesSoftwareVersions); } } @@ -747,13 +736,11 @@ private String resolveNetworkLocation (DatanodeID node) /** * Resolve network locations for specified hosts * - * @param names * @return Network locations if available, Else returns null */ public List resolveNetworkLocation(List names) { // resolve its network location - List rName = dnsToSwitchMapping.resolve(names); - return rName; + return dnsToSwitchMapping.resolve(names); } /** @@ -807,10 +794,9 @@ private List getNetworkDependencies(DatanodeInfo node) * This is used to not to display a decommissioned datanode to the operators. * @param nodeList , array list of live or dead nodes. */ - private void removeDecomNodeFromList( + private static void removeDecomNodeFromList( final List nodeList) { - Iterator it=null; - for (it = nodeList.iterator(); it.hasNext();) { + for (Iterator it = nodeList.iterator(); it.hasNext();) { DatanodeDescriptor node = it.next(); if (node.isDecommissioned()) { it.remove(); @@ -968,6 +954,7 @@ nodes with its data cleared (or user can just remove the StorageID // register new datanode addDatanode(nodeDescr); + blockManager.getBlockReportLeaseManager().register(nodeDescr); // also treat the registration message as a heartbeat // no need to update its timestamp // because its is done when the descriptor is created @@ -1030,7 +1017,11 @@ private void refreshHostsReader(Configuration conf) throws IOException { * 4. Removed from exclude --> stop decommission. */ private void refreshDatanodes() { - for(DatanodeDescriptor node : datanodeMap.values()) { + final Map copy; + synchronized (this) { + copy = new HashMap<>(datanodeMap); + } + for (DatanodeDescriptor node : copy.values()) { // Check if not include. if (!hostFileManager.isIncluded(node)) { node.setDisallowed(true); // case 2. @@ -1047,7 +1038,7 @@ private void refreshDatanodes() { /** @return the number of live datanodes. */ public int getNumLiveDataNodes() { int numLive = 0; - synchronized (datanodeMap) { + synchronized (this) { for(DatanodeDescriptor dn : datanodeMap.values()) { if (!isDatanodeDead(dn) ) { numLive++; @@ -1252,7 +1243,7 @@ public List getDatanodeListForReport( final HostFileManager.HostSet includedNodes = hostFileManager.getIncludes(); final HostFileManager.HostSet excludedNodes = hostFileManager.getExcludes(); - synchronized(datanodeMap) { + synchronized(this) { nodes = new ArrayList<>(datanodeMap.size()); for (DatanodeDescriptor dn : datanodeMap.values()) { final boolean isDead = isDatanodeDead(dn); @@ -1327,155 +1318,160 @@ private void setDatanodeDead(DatanodeDescriptor node) { node.setLastUpdateMonotonic(0); } + private BlockRecoveryCommand getBlockRecoveryCommand(String blockPoolId, + DatanodeDescriptor nodeinfo) { + BlockInfo[] blocks = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE); + if (blocks == null) { + return null; + } + BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.length); + for (BlockInfo b : blocks) { + BlockUnderConstructionFeature uc = b.getUnderConstructionFeature(); + assert uc != null; + final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); + // Skip stale nodes during recovery + final List recoveryLocations = + new ArrayList<>(storages.length); + for (DatanodeStorageInfo storage : storages) { + if (!storage.getDatanodeDescriptor().isStale(staleInterval)) { + recoveryLocations.add(storage); + } + } + // If we are performing a truncate recovery than set recovery fields + // to old block. + boolean truncateRecovery = uc.getTruncateBlock() != null; + boolean copyOnTruncateRecovery = truncateRecovery && + uc.getTruncateBlock().getBlockId() != b.getBlockId(); + ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ? + new ExtendedBlock(blockPoolId, uc.getTruncateBlock()) : + new ExtendedBlock(blockPoolId, b); + // If we only get 1 replica after eliminating stale nodes, choose all + // replicas for recovery and let the primary data node handle failures. + DatanodeInfo[] recoveryInfos; + if (recoveryLocations.size() > 1) { + if (recoveryLocations.size() != storages.length) { + LOG.info("Skipped stale nodes for recovery : " + + (storages.length - recoveryLocations.size())); + } + recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(recoveryLocations); + } else { + // If too many replicas are stale, then choose all replicas to + // participate in block recovery. + recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages); + } + RecoveringBlock rBlock; + if (truncateRecovery) { + Block recoveryBlock = (copyOnTruncateRecovery) ? b : uc.getTruncateBlock(); + rBlock = new RecoveringBlock(primaryBlock, recoveryInfos, recoveryBlock); + } else { + rBlock = new RecoveringBlock(primaryBlock, recoveryInfos, + uc.getBlockRecoveryId()); + } + brCommand.add(rBlock); + } + return brCommand; + } + + private void addCacheCommands(String blockPoolId, DatanodeDescriptor nodeinfo, + List cmds) { + boolean sendingCachingCommands = false; + final long nowMs = monotonicNow(); + if (shouldSendCachingCommands && + ((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >= + timeBetweenResendingCachingDirectivesMs)) { + DatanodeCommand pendingCacheCommand = getCacheCommand( + nodeinfo.getPendingCached(), DatanodeProtocol.DNA_CACHE, + blockPoolId); + if (pendingCacheCommand != null) { + cmds.add(pendingCacheCommand); + sendingCachingCommands = true; + } + DatanodeCommand pendingUncacheCommand = getCacheCommand( + nodeinfo.getPendingUncached(), DatanodeProtocol.DNA_UNCACHE, + blockPoolId); + if (pendingUncacheCommand != null) { + cmds.add(pendingUncacheCommand); + sendingCachingCommands = true; + } + if (sendingCachingCommands) { + nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs); + } + } + } + /** Handle heartbeat from datanodes. */ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, StorageReport[] reports, final String blockPoolId, long cacheCapacity, long cacheUsed, int xceiverCount, int maxTransfers, int failedVolumes, VolumeFailureSummary volumeFailureSummary) throws IOException { - synchronized (heartbeatManager) { - synchronized (datanodeMap) { - DatanodeDescriptor nodeinfo; - try { - nodeinfo = getDatanode(nodeReg); - } catch(UnregisteredNodeException e) { - return new DatanodeCommand[]{RegisterCommand.REGISTER}; - } - - // Check if this datanode should actually be shutdown instead. - if (nodeinfo != null && nodeinfo.isDisallowed()) { - setDatanodeDead(nodeinfo); - throw new DisallowedDatanodeException(nodeinfo); - } + final DatanodeDescriptor nodeinfo; + try { + nodeinfo = getDatanode(nodeReg); + } catch (UnregisteredNodeException e) { + return new DatanodeCommand[]{RegisterCommand.REGISTER}; + } - if (nodeinfo == null || !nodeinfo.isAlive()) { - return new DatanodeCommand[]{RegisterCommand.REGISTER}; - } + // Check if this datanode should actually be shutdown instead. + if (nodeinfo != null && nodeinfo.isDisallowed()) { + setDatanodeDead(nodeinfo); + throw new DisallowedDatanodeException(nodeinfo); + } - heartbeatManager.updateHeartbeat(nodeinfo, reports, - cacheCapacity, cacheUsed, - xceiverCount, failedVolumes, - volumeFailureSummary); + if (nodeinfo == null || !nodeinfo.isAlive()) { + return new DatanodeCommand[]{RegisterCommand.REGISTER}; + } + heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity, + cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary); - // If we are in safemode, do not send back any recovery / replication - // requests. Don't even drain the existing queue of work. - if(namesystem.isInSafeMode()) { - return new DatanodeCommand[0]; - } + // If we are in safemode, do not send back any recovery / replication + // requests. Don't even drain the existing queue of work. + if (namesystem.isInSafeMode()) { + return new DatanodeCommand[0]; + } - //check lease recovery - BlockInfo[] blocks = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE); - if (blocks != null) { - BlockRecoveryCommand brCommand = new BlockRecoveryCommand( - blocks.length); - for (BlockInfo b : blocks) { - BlockUnderConstructionFeature uc = b.getUnderConstructionFeature(); - assert uc != null; - final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); - // Skip stale nodes during recovery - not heart beated for some time (30s by default). - final List recoveryLocations = - new ArrayList<>(storages.length); - for (DatanodeStorageInfo storage : storages) { - if (!storage.getDatanodeDescriptor().isStale(staleInterval)) { - recoveryLocations.add(storage); - } - } - // If we are performing a truncate recovery than set recovery fields - // to old block. - boolean truncateRecovery = uc.getTruncateBlock() != null; - boolean copyOnTruncateRecovery = truncateRecovery && - uc.getTruncateBlock().getBlockId() != b.getBlockId(); - ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ? - new ExtendedBlock(blockPoolId, uc.getTruncateBlock()) : - new ExtendedBlock(blockPoolId, b); - // If we only get 1 replica after eliminating stale nodes, then choose all - // replicas for recovery and let the primary data node handle failures. - DatanodeInfo[] recoveryInfos; - if (recoveryLocations.size() > 1) { - if (recoveryLocations.size() != storages.length) { - LOG.info("Skipped stale nodes for recovery : " + - (storages.length - recoveryLocations.size())); - } - recoveryInfos = - DatanodeStorageInfo.toDatanodeInfos(recoveryLocations); - } else { - // If too many replicas are stale, then choose all replicas to participate - // in block recovery. - recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages); - } - RecoveringBlock rBlock; - if(truncateRecovery) { - Block recoveryBlock = (copyOnTruncateRecovery) ? b : - uc.getTruncateBlock(); - rBlock = new RecoveringBlock(primaryBlock, recoveryInfos, - recoveryBlock); - } else { - rBlock = new RecoveringBlock(primaryBlock, recoveryInfos, - uc.getBlockRecoveryId()); - } - brCommand.add(rBlock); - } - return new DatanodeCommand[] { brCommand }; - } + // block recovery command + final BlockRecoveryCommand brCommand = getBlockRecoveryCommand(blockPoolId, + nodeinfo); + if (brCommand != null) { + return new DatanodeCommand[]{brCommand}; + } - final List cmds = new ArrayList<>(); - //check pending replication - List pendingList = nodeinfo.getReplicationCommand( - maxTransfers); - if (pendingList != null) { - cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, - pendingList)); - } - // checking pending erasure coding tasks - List pendingECList = - nodeinfo.getErasureCodeCommand(maxTransfers); - if (pendingECList != null) { - cmds.add(new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, - pendingECList)); - } - //check block invalidation - Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); - if (blks != null) { - cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, - blockPoolId, blks)); - } - boolean sendingCachingCommands = false; - long nowMs = monotonicNow(); - if (shouldSendCachingCommands && - ((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >= - timeBetweenResendingCachingDirectivesMs)) { - DatanodeCommand pendingCacheCommand = - getCacheCommand(nodeinfo.getPendingCached(), nodeinfo, - DatanodeProtocol.DNA_CACHE, blockPoolId); - if (pendingCacheCommand != null) { - cmds.add(pendingCacheCommand); - sendingCachingCommands = true; - } - DatanodeCommand pendingUncacheCommand = - getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo, - DatanodeProtocol.DNA_UNCACHE, blockPoolId); - if (pendingUncacheCommand != null) { - cmds.add(pendingUncacheCommand); - sendingCachingCommands = true; - } - if (sendingCachingCommands) { - nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs); - } - } + final List cmds = new ArrayList<>(); + // check pending replication + List pendingList = nodeinfo.getReplicationCommand( + maxTransfers); + if (pendingList != null) { + cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, + pendingList)); + } + // check pending erasure coding tasks + List pendingECList = nodeinfo.getErasureCodeCommand( + maxTransfers); + if (pendingECList != null) { + cmds.add(new BlockECRecoveryCommand(DNA_ERASURE_CODING_RECOVERY, + pendingECList)); + } + // check block invalidation + Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); + if (blks != null) { + cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId, + blks)); + } + // cache commands + addCacheCommands(blockPoolId, nodeinfo, cmds); + // key update command + blockManager.addKeyUpdateCommand(cmds, nodeinfo); - blockManager.addKeyUpdateCommand(cmds, nodeinfo); + // check for balancer bandwidth update + if (nodeinfo.getBalancerBandwidth() > 0) { + cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth())); + // set back to 0 to indicate that datanode has been sent the new value + nodeinfo.setBalancerBandwidth(0); + } - // check for balancer bandwidth update - if (nodeinfo.getBalancerBandwidth() > 0) { - cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth())); - // set back to 0 to indicate that datanode has been sent the new value - nodeinfo.setBalancerBandwidth(0); - } - - if (!cmds.isEmpty()) { - return cmds.toArray(new DatanodeCommand[cmds.size()]); - } - } + if (!cmds.isEmpty()) { + return cmds.toArray(new DatanodeCommand[cmds.size()]); } return new DatanodeCommand[0]; @@ -1486,14 +1482,13 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, * * @param list The {@link CachedBlocksList}. This function * clears the list. - * @param datanode The datanode. * @param action The action to perform in the command. * @param poolId The block pool id. * @return A DatanodeCommand to be sent back to the DN, or null if * there is nothing to be done. */ - private DatanodeCommand getCacheCommand(CachedBlocksList list, - DatanodeDescriptor datanode, int action, String poolId) { + private DatanodeCommand getCacheCommand(CachedBlocksList list, int action, + String poolId) { int length = list.size(); if (length == 0) { return null; @@ -1501,9 +1496,7 @@ private DatanodeCommand getCacheCommand(CachedBlocksList list, // Read the existing cache commands. long[] blockIds = new long[length]; int i = 0; - for (Iterator iter = list.iterator(); - iter.hasNext(); ) { - CachedBlock cachedBlock = iter.next(); + for (CachedBlock cachedBlock : list) { blockIds[i++] = cachedBlock.getBlockId(); } return new BlockIdCommand(action, poolId, blockIds); @@ -1524,7 +1517,7 @@ private DatanodeCommand getCacheCommand(CachedBlocksList list, * @throws IOException */ public void setBalancerBandwidth(long bandwidth) throws IOException { - synchronized(datanodeMap) { + synchronized(this) { for (DatanodeDescriptor nodeInfo : datanodeMap.values()) { nodeInfo.setBalancerBandwidth(bandwidth); } @@ -1533,7 +1526,7 @@ public void setBalancerBandwidth(long bandwidth) throws IOException { public void markAllDatanodesStale() { LOG.info("Marking all datandoes as stale"); - synchronized (datanodeMap) { + synchronized (this) { for (DatanodeDescriptor dn : datanodeMap.values()) { for(DatanodeStorageInfo storage : dn.getStorageInfos()) { storage.markStaleAfterFailover(); @@ -1548,7 +1541,7 @@ public void markAllDatanodesStale() { * recoveries, and replication requests. */ public void clearPendingQueues() { - synchronized (datanodeMap) { + synchronized (this) { for (DatanodeDescriptor dn : datanodeMap.values()) { dn.clearBlockQueues(); } @@ -1560,7 +1553,7 @@ public void clearPendingQueues() { * know about. */ public void resetLastCachingDirectiveSentTime() { - synchronized (datanodeMap) { + synchronized (this) { for (DatanodeDescriptor dn : datanodeMap.values()) { dn.setLastCachingDirectiveSentTimeMs(0L); } @@ -1573,9 +1566,11 @@ public String toString() { } public void clearPendingCachingCommands() { - for (DatanodeDescriptor dn : datanodeMap.values()) { - dn.getPendingCached().clear(); - dn.getPendingUncached().clear(); + synchronized (this) { + for (DatanodeDescriptor dn : datanodeMap.values()) { + dn.getPendingCached().clear(); + dn.getPendingUncached().clear(); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index d0369aa1e5b..9f23b328b85 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.server.namenode.Namesystem; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.util.Daemon; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index ba6f0e1a895..b25c5f78c4a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -418,9 +418,6 @@ private void logAuditEvent(boolean succeeded, */ private volatile boolean needRollbackFsImage; - // Block pool ID used by this namenode - private String blockPoolId; - final LeaseManager leaseManager = new LeaseManager(this); Daemon nnrmthread = null; // NamenodeResourceMonitor thread @@ -2348,12 +2345,11 @@ LastBlockWithStatus appendFile(String srcArg, String holder, } ExtendedBlock getExtendedBlock(Block blk) { - return new ExtendedBlock(blockPoolId, blk); + return new ExtendedBlock(getBlockPoolId(), blk); } void setBlockPoolId(String bpid) { - blockPoolId = bpid; - blockManager.setBlockPoolId(blockPoolId); + blockManager.setBlockPoolId(bpid); } /** @@ -3489,11 +3485,11 @@ String getRegistrationID() { * The given node has reported in. This method should: * 1) Record the heartbeat, so the datanode isn't timed out * 2) Adjust usage stats for future block allocation - * - * If a substantial amount of time passed since the last datanode - * heartbeat then request an immediate block report. - * - * @return an array of datanode commands + * + * If a substantial amount of time passed since the last datanode + * heartbeat then request an immediate block report. + * + * @return an array of datanode commands * @throws IOException */ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg, @@ -3507,7 +3503,7 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg, final int maxTransfer = blockManager.getMaxReplicationStreams() - xmitsInProgress; DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat( - nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed, + nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed, xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary); long blockReportLeaseId = 0; if (requestFullBlockReportLease) { @@ -5371,7 +5367,7 @@ public String getClusterId() { @Override // NameNodeMXBean public String getBlockPoolId() { - return blockPoolId; + return getBlockManager().getBlockPoolId(); } @Override // NameNodeMXBean @@ -5960,7 +5956,7 @@ private void startRollingUpgradeInternalForNonHA(long startTime) } void setRollingUpgradeInfo(boolean createdRollbackImages, long startTime) { - rollingUpgradeInfo = new RollingUpgradeInfo(blockPoolId, + rollingUpgradeInfo = new RollingUpgradeInfo(getBlockPoolId(), createdRollbackImages, startTime, 0L); }