HDFS-9371. Code cleanup for DatanodeManager. Contributed by Jing Zhao.

(cherry picked from commit 8602692338)
This commit is contained in:
Jing Zhao 2015-12-15 10:47:53 -08:00
parent 83c72ce9f6
commit e0abb0a4e7
6 changed files with 221 additions and 231 deletions

View File

@ -26,6 +26,8 @@ Release 2.9.0 - UNRELEASED
HDFS-9281. Change TestDeleteBlockPool to not explicitly use File to check HDFS-9281. Change TestDeleteBlockPool to not explicitly use File to check
block pool existence. (lei) block pool existence. (lei)
HDFS-9371. Code cleanup for DatanodeManager. (jing9)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -123,6 +123,9 @@ public class BlockManager implements BlockStatsMXBean {
private final HeartbeatManager heartbeatManager; private final HeartbeatManager heartbeatManager;
private final BlockTokenSecretManager blockTokenSecretManager; private final BlockTokenSecretManager blockTokenSecretManager;
// Block pool ID used by this namenode
private String blockPoolId;
private final PendingDataNodeMessages pendingDNMessages = private final PendingDataNodeMessages pendingDNMessages =
new PendingDataNodeMessages(); new PendingDataNodeMessages();
@ -439,11 +442,16 @@ public BlockStoragePolicy[] getStoragePolicies() {
} }
public void setBlockPoolId(String blockPoolId) { public void setBlockPoolId(String blockPoolId) {
this.blockPoolId = blockPoolId;
if (isBlockTokenEnabled()) { if (isBlockTokenEnabled()) {
blockTokenSecretManager.setBlockPoolId(blockPoolId); blockTokenSecretManager.setBlockPoolId(blockPoolId);
} }
} }
public String getBlockPoolId() {
return blockPoolId;
}
public BlockStoragePolicySuite getStoragePolicySuite() { public BlockStoragePolicySuite getStoragePolicySuite() {
return storagePolicySuite; return storagePolicySuite;
} }
@ -1140,18 +1148,6 @@ private void addToInvalidates(Block b) {
} }
} }
/**
* Remove all block invalidation tasks under this datanode UUID;
* used when a datanode registers with a new UUID and the old one
* is wiped.
*/
void removeFromInvalidates(final DatanodeInfo datanode) {
if (!isPopulatingReplQueues()) {
return;
}
invalidateBlocks.remove(datanode);
}
/** /**
* Mark the block belonging to datanode as corrupt * Mark the block belonging to datanode as corrupt
* @param blk Block to be marked as corrupt * @param blk Block to be marked as corrupt

View File

@ -286,11 +286,11 @@ public void setAlive(boolean isAlive) {
this.isAlive = isAlive; this.isAlive = isAlive;
} }
public boolean needKeyUpdate() { public synchronized boolean needKeyUpdate() {
return needKeyUpdate; return needKeyUpdate;
} }
public void setNeedKeyUpdate(boolean needKeyUpdate) { public synchronized void setNeedKeyUpdate(boolean needKeyUpdate) {
this.needKeyUpdate = needKeyUpdate; this.needKeyUpdate = needKeyUpdate;
} }
@ -829,14 +829,14 @@ public void updateRegInfo(DatanodeID nodeReg) {
/** /**
* @return balancer bandwidth in bytes per second for this datanode * @return balancer bandwidth in bytes per second for this datanode
*/ */
public long getBalancerBandwidth() { public synchronized long getBalancerBandwidth() {
return this.bandwidth; return this.bandwidth;
} }
/** /**
* @param bandwidth balancer bandwidth in bytes per second for this datanode * @param bandwidth balancer bandwidth in bytes per second for this datanode
*/ */
public void setBalancerBandwidth(long bandwidth) { public synchronized void setBalancerBandwidth(long bandwidth) {
this.bandwidth = bandwidth; this.bandwidth = bandwidth;
} }

View File

@ -161,7 +161,7 @@ public class DatanodeManager {
* during rolling upgrades. * during rolling upgrades.
* Software version -> Number of datanodes with this version * Software version -> Number of datanodes with this version
*/ */
private HashMap<String, Integer> datanodesSoftwareVersions = private final HashMap<String, Integer> datanodesSoftwareVersions =
new HashMap<>(4, 0.75f); new HashMap<>(4, 0.75f);
/** /**
@ -351,15 +351,9 @@ public DatanodeStatistics getDatanodeStatistics() {
} }
private boolean isInactive(DatanodeInfo datanode) { private boolean isInactive(DatanodeInfo datanode) {
if (datanode.isDecommissioned()) { return datanode.isDecommissioned() ||
return true; (avoidStaleDataNodesForRead && datanode.isStale(staleInterval));
}
if (avoidStaleDataNodesForRead) {
return datanode.isStale(staleInterval);
}
return false;
} }
/** Sort the located blocks by the distance to the target host. */ /** Sort the located blocks by the distance to the target host. */
@ -478,9 +472,10 @@ DatanodeDescriptor getDatanode(final String datanodeUuid) {
if (datanodeUuid == null) { if (datanodeUuid == null) {
return null; return null;
} }
synchronized (this) {
return datanodeMap.get(datanodeUuid); return datanodeMap.get(datanodeUuid);
} }
}
/** /**
* Get data node by datanode ID. * Get data node by datanode ID.
@ -489,8 +484,8 @@ DatanodeDescriptor getDatanode(final String datanodeUuid) {
* @return DatanodeDescriptor or null if the node is not found. * @return DatanodeDescriptor or null if the node is not found.
* @throws UnregisteredNodeException * @throws UnregisteredNodeException
*/ */
public DatanodeDescriptor getDatanode(DatanodeID nodeID public DatanodeDescriptor getDatanode(DatanodeID nodeID)
) throws UnregisteredNodeException { throws UnregisteredNodeException {
final DatanodeDescriptor node = getDatanode(nodeID.getDatanodeUuid()); final DatanodeDescriptor node = getDatanode(nodeID.getDatanodeUuid());
if (node == null) if (node == null)
return null; return null;
@ -530,15 +525,15 @@ public DatanodeStorageInfo[] getDatanodeStorageInfos(
/** Prints information about all datanodes. */ /** Prints information about all datanodes. */
void datanodeDump(final PrintWriter out) { void datanodeDump(final PrintWriter out) {
synchronized (datanodeMap) { final Map<String,DatanodeDescriptor> sortedDatanodeMap;
Map<String,DatanodeDescriptor> sortedDatanodeMap = synchronized (this) {
new TreeMap<>(datanodeMap); sortedDatanodeMap = new TreeMap<>(datanodeMap);
out.println("Metasave: Number of datanodes: " + datanodeMap.size()); }
out.println("Metasave: Number of datanodes: " + sortedDatanodeMap.size());
for (DatanodeDescriptor node : sortedDatanodeMap.values()) { for (DatanodeDescriptor node : sortedDatanodeMap.values()) {
out.println(node.dumpDatanode()); out.println(node.dumpDatanode());
} }
} }
}
/** /**
* Remove a datanode descriptor. * Remove a datanode descriptor.
@ -562,8 +557,8 @@ private void removeDatanode(DatanodeDescriptor nodeInfo) {
* Remove a datanode * Remove a datanode
* @throws UnregisteredNodeException * @throws UnregisteredNodeException
*/ */
public void removeDatanode(final DatanodeID node public void removeDatanode(final DatanodeID node)
) throws UnregisteredNodeException { throws UnregisteredNodeException {
namesystem.writeLock(); namesystem.writeLock();
try { try {
final DatanodeDescriptor descriptor = getDatanode(node); final DatanodeDescriptor descriptor = getDatanode(node);
@ -580,7 +575,6 @@ public void removeDatanode(final DatanodeID node
/** Remove a dead datanode. */ /** Remove a dead datanode. */
void removeDeadDatanode(final DatanodeID nodeID) { void removeDeadDatanode(final DatanodeID nodeID) {
synchronized(datanodeMap) {
DatanodeDescriptor d; DatanodeDescriptor d;
try { try {
d = getDatanode(nodeID); d = getDatanode(nodeID);
@ -593,7 +587,6 @@ void removeDeadDatanode(final DatanodeID nodeID) {
removeDatanode(d); removeDatanode(d);
} }
} }
}
/** Is the datanode dead? */ /** Is the datanode dead? */
boolean isDatanodeDead(DatanodeDescriptor node) { boolean isDatanodeDead(DatanodeDescriptor node) {
@ -606,14 +599,13 @@ void addDatanode(final DatanodeDescriptor node) {
// To keep host2DatanodeMap consistent with datanodeMap, // To keep host2DatanodeMap consistent with datanodeMap,
// remove from host2DatanodeMap the datanodeDescriptor removed // remove from host2DatanodeMap the datanodeDescriptor removed
// from datanodeMap before adding node to host2DatanodeMap. // from datanodeMap before adding node to host2DatanodeMap.
synchronized(datanodeMap) { synchronized(this) {
host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node)); host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
} }
networktopology.add(node); // may throw InvalidTopologyException networktopology.add(node); // may throw InvalidTopologyException
host2DatanodeMap.add(node); host2DatanodeMap.add(node);
checkIfClusterIsNowMultiRack(node); checkIfClusterIsNowMultiRack(node);
blockManager.getBlockReportLeaseManager().register(node);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".addDatanode: " LOG.debug(getClass().getSimpleName() + ".addDatanode: "
@ -624,11 +616,9 @@ void addDatanode(final DatanodeDescriptor node) {
/** Physically remove node from datanodeMap. */ /** Physically remove node from datanodeMap. */
private void wipeDatanode(final DatanodeID node) { private void wipeDatanode(final DatanodeID node) {
final String key = node.getDatanodeUuid(); final String key = node.getDatanodeUuid();
synchronized (datanodeMap) { synchronized (this) {
host2DatanodeMap.remove(datanodeMap.remove(key)); host2DatanodeMap.remove(datanodeMap.remove(key));
} }
// Also remove all block invalidation tasks under this node
blockManager.removeFromInvalidates(new DatanodeInfo(node));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".wipeDatanode(" LOG.debug(getClass().getSimpleName() + ".wipeDatanode("
+ node + "): storage " + key + node + "): storage " + key
@ -640,7 +630,7 @@ private void incrementVersionCount(String version) {
if (version == null) { if (version == null) {
return; return;
} }
synchronized(datanodeMap) { synchronized(this) {
Integer count = this.datanodesSoftwareVersions.get(version); Integer count = this.datanodesSoftwareVersions.get(version);
count = count == null ? 1 : count + 1; count = count == null ? 1 : count + 1;
this.datanodesSoftwareVersions.put(version, count); this.datanodesSoftwareVersions.put(version, count);
@ -651,7 +641,7 @@ private void decrementVersionCount(String version) {
if (version == null) { if (version == null) {
return; return;
} }
synchronized(datanodeMap) { synchronized(this) {
Integer count = this.datanodesSoftwareVersions.get(version); Integer count = this.datanodesSoftwareVersions.get(version);
if(count != null) { if(count != null) {
if(count > 1) { if(count > 1) {
@ -669,24 +659,22 @@ private boolean shouldCountVersion(DatanodeDescriptor node) {
} }
private void countSoftwareVersions() { private void countSoftwareVersions() {
synchronized(datanodeMap) { synchronized(this) {
HashMap<String, Integer> versionCount = new HashMap<>(); datanodesSoftwareVersions.clear();
for(DatanodeDescriptor dn: datanodeMap.values()) { for(DatanodeDescriptor dn: datanodeMap.values()) {
// Check isAlive too because right after removeDatanode(), // Check isAlive too because right after removeDatanode(),
// isDatanodeDead() is still true // isDatanodeDead() is still true
if(shouldCountVersion(dn)) if (shouldCountVersion(dn)) {
{ Integer num = datanodesSoftwareVersions.get(dn.getSoftwareVersion());
Integer num = versionCount.get(dn.getSoftwareVersion());
num = num == null ? 1 : num+1; num = num == null ? 1 : num+1;
versionCount.put(dn.getSoftwareVersion(), num); datanodesSoftwareVersions.put(dn.getSoftwareVersion(), num);
} }
} }
this.datanodesSoftwareVersions = versionCount;
} }
} }
public HashMap<String, Integer> getDatanodesSoftwareVersions() { public HashMap<String, Integer> getDatanodesSoftwareVersions() {
synchronized(datanodeMap) { synchronized(this) {
return new HashMap<> (this.datanodesSoftwareVersions); return new HashMap<> (this.datanodesSoftwareVersions);
} }
} }
@ -742,13 +730,11 @@ private String resolveNetworkLocation (DatanodeID node)
/** /**
* Resolve network locations for specified hosts * Resolve network locations for specified hosts
* *
* @param names
* @return Network locations if available, Else returns null * @return Network locations if available, Else returns null
*/ */
public List<String> resolveNetworkLocation(List<String> names) { public List<String> resolveNetworkLocation(List<String> names) {
// resolve its network location // resolve its network location
List<String> rName = dnsToSwitchMapping.resolve(names); return dnsToSwitchMapping.resolve(names);
return rName;
} }
/** /**
@ -802,10 +788,9 @@ private List<String> getNetworkDependencies(DatanodeInfo node)
* This is used to not to display a decommissioned datanode to the operators. * This is used to not to display a decommissioned datanode to the operators.
* @param nodeList , array list of live or dead nodes. * @param nodeList , array list of live or dead nodes.
*/ */
private void removeDecomNodeFromList( private static void removeDecomNodeFromList(
final List<DatanodeDescriptor> nodeList) { final List<DatanodeDescriptor> nodeList) {
Iterator<DatanodeDescriptor> it=null; for (Iterator<DatanodeDescriptor> it = nodeList.iterator(); it.hasNext();) {
for (it = nodeList.iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next(); DatanodeDescriptor node = it.next();
if (node.isDecommissioned()) { if (node.isDecommissioned()) {
it.remove(); it.remove();
@ -963,6 +948,7 @@ nodes with its data cleared (or user can just remove the StorageID
// register new datanode // register new datanode
addDatanode(nodeDescr); addDatanode(nodeDescr);
blockManager.getBlockReportLeaseManager().register(nodeDescr);
// also treat the registration message as a heartbeat // also treat the registration message as a heartbeat
// no need to update its timestamp // no need to update its timestamp
// because its is done when the descriptor is created // because its is done when the descriptor is created
@ -1025,7 +1011,11 @@ private void refreshHostsReader(Configuration conf) throws IOException {
* 4. Removed from exclude --> stop decommission. * 4. Removed from exclude --> stop decommission.
*/ */
private void refreshDatanodes() { private void refreshDatanodes() {
for(DatanodeDescriptor node : datanodeMap.values()) { final Map<String, DatanodeDescriptor> copy;
synchronized (this) {
copy = new HashMap<>(datanodeMap);
}
for (DatanodeDescriptor node : copy.values()) {
// Check if not include. // Check if not include.
if (!hostFileManager.isIncluded(node)) { if (!hostFileManager.isIncluded(node)) {
node.setDisallowed(true); // case 2. node.setDisallowed(true); // case 2.
@ -1042,7 +1032,7 @@ private void refreshDatanodes() {
/** @return the number of live datanodes. */ /** @return the number of live datanodes. */
public int getNumLiveDataNodes() { public int getNumLiveDataNodes() {
int numLive = 0; int numLive = 0;
synchronized (datanodeMap) { synchronized (this) {
for(DatanodeDescriptor dn : datanodeMap.values()) { for(DatanodeDescriptor dn : datanodeMap.values()) {
if (!isDatanodeDead(dn) ) { if (!isDatanodeDead(dn) ) {
numLive++; numLive++;
@ -1247,7 +1237,7 @@ public List<DatanodeDescriptor> getDatanodeListForReport(
final HostFileManager.HostSet includedNodes = hostFileManager.getIncludes(); final HostFileManager.HostSet includedNodes = hostFileManager.getIncludes();
final HostFileManager.HostSet excludedNodes = hostFileManager.getExcludes(); final HostFileManager.HostSet excludedNodes = hostFileManager.getExcludes();
synchronized(datanodeMap) { synchronized(this) {
nodes = new ArrayList<>(datanodeMap.size()); nodes = new ArrayList<>(datanodeMap.size());
for (DatanodeDescriptor dn : datanodeMap.values()) { for (DatanodeDescriptor dn : datanodeMap.values()) {
final boolean isDead = isDatanodeDead(dn); final boolean isDead = isDatanodeDead(dn);
@ -1322,18 +1312,97 @@ private void setDatanodeDead(DatanodeDescriptor node) {
node.setLastUpdateMonotonic(0); node.setLastUpdateMonotonic(0);
} }
private BlockRecoveryCommand getBlockRecoveryCommand(String blockPoolId,
DatanodeDescriptor nodeinfo) {
BlockInfo[] blocks = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
if (blocks == null) {
return null;
}
BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.length);
for (BlockInfo b : blocks) {
BlockUnderConstructionFeature uc = b.getUnderConstructionFeature();
assert uc != null;
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
// Skip stale nodes during recovery
final List<DatanodeStorageInfo> recoveryLocations =
new ArrayList<>(storages.length);
for (DatanodeStorageInfo storage : storages) {
if (!storage.getDatanodeDescriptor().isStale(staleInterval)) {
recoveryLocations.add(storage);
}
}
// If we are performing a truncate recovery than set recovery fields
// to old block.
boolean truncateRecovery = uc.getTruncateBlock() != null;
boolean copyOnTruncateRecovery = truncateRecovery &&
uc.getTruncateBlock().getBlockId() != b.getBlockId();
ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ?
new ExtendedBlock(blockPoolId, uc.getTruncateBlock()) :
new ExtendedBlock(blockPoolId, b);
// If we only get 1 replica after eliminating stale nodes, choose all
// replicas for recovery and let the primary data node handle failures.
DatanodeInfo[] recoveryInfos;
if (recoveryLocations.size() > 1) {
if (recoveryLocations.size() != storages.length) {
LOG.info("Skipped stale nodes for recovery : "
+ (storages.length - recoveryLocations.size()));
}
recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(recoveryLocations);
} else {
// If too many replicas are stale, then choose all replicas to
// participate in block recovery.
recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages);
}
RecoveringBlock rBlock;
if (truncateRecovery) {
Block recoveryBlock = (copyOnTruncateRecovery) ? b : uc.getTruncateBlock();
rBlock = new RecoveringBlock(primaryBlock, recoveryInfos, recoveryBlock);
} else {
rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
uc.getBlockRecoveryId());
}
brCommand.add(rBlock);
}
return brCommand;
}
private void addCacheCommands(String blockPoolId, DatanodeDescriptor nodeinfo,
List<DatanodeCommand> cmds) {
boolean sendingCachingCommands = false;
final long nowMs = monotonicNow();
if (shouldSendCachingCommands &&
((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
timeBetweenResendingCachingDirectivesMs)) {
DatanodeCommand pendingCacheCommand = getCacheCommand(
nodeinfo.getPendingCached(), DatanodeProtocol.DNA_CACHE,
blockPoolId);
if (pendingCacheCommand != null) {
cmds.add(pendingCacheCommand);
sendingCachingCommands = true;
}
DatanodeCommand pendingUncacheCommand = getCacheCommand(
nodeinfo.getPendingUncached(), DatanodeProtocol.DNA_UNCACHE,
blockPoolId);
if (pendingUncacheCommand != null) {
cmds.add(pendingUncacheCommand);
sendingCachingCommands = true;
}
if (sendingCachingCommands) {
nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
}
}
}
/** Handle heartbeat from datanodes. */ /** Handle heartbeat from datanodes. */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, final String blockPoolId, StorageReport[] reports, final String blockPoolId,
long cacheCapacity, long cacheUsed, int xceiverCount, long cacheCapacity, long cacheUsed, int xceiverCount,
int maxTransfers, int failedVolumes, int maxTransfers, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException { VolumeFailureSummary volumeFailureSummary) throws IOException {
synchronized (heartbeatManager) { final DatanodeDescriptor nodeinfo;
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo;
try { try {
nodeinfo = getDatanode(nodeReg); nodeinfo = getDatanode(nodeReg);
} catch(UnregisteredNodeException e) { } catch (UnregisteredNodeException e) {
return new DatanodeCommand[]{RegisterCommand.REGISTER}; return new DatanodeCommand[]{RegisterCommand.REGISTER};
} }
@ -1346,111 +1415,39 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
if (nodeinfo == null || !nodeinfo.isAlive()) { if (nodeinfo == null || !nodeinfo.isAlive()) {
return new DatanodeCommand[]{RegisterCommand.REGISTER}; return new DatanodeCommand[]{RegisterCommand.REGISTER};
} }
heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);
cacheCapacity, cacheUsed,
xceiverCount, failedVolumes,
volumeFailureSummary);
// If we are in safemode, do not send back any recovery / replication // If we are in safemode, do not send back any recovery / replication
// requests. Don't even drain the existing queue of work. // requests. Don't even drain the existing queue of work.
if(namesystem.isInSafeMode()) { if (namesystem.isInSafeMode()) {
return new DatanodeCommand[0]; return new DatanodeCommand[0];
} }
//check lease recovery // block recovery command
BlockInfo[] blocks = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE); final BlockRecoveryCommand brCommand = getBlockRecoveryCommand(blockPoolId,
if (blocks != null) { nodeinfo);
BlockRecoveryCommand brCommand = new BlockRecoveryCommand( if (brCommand != null) {
blocks.length); return new DatanodeCommand[]{brCommand};
for (BlockInfo b : blocks) {
BlockUnderConstructionFeature uc = b.getUnderConstructionFeature();
assert uc != null;
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
// Skip stale nodes during recovery - not heart beated for some time (30s by default).
final List<DatanodeStorageInfo> recoveryLocations =
new ArrayList<>(storages.length);
for (int i = 0; i < storages.length; i++) {
if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) {
recoveryLocations.add(storages[i]);
}
}
// If we are performing a truncate recovery than set recovery fields
// to old block.
boolean truncateRecovery = uc.getTruncateBlock() != null;
boolean copyOnTruncateRecovery = truncateRecovery &&
uc.getTruncateBlock().getBlockId() != b.getBlockId();
ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ?
new ExtendedBlock(blockPoolId, uc.getTruncateBlock()) :
new ExtendedBlock(blockPoolId, b);
// If we only get 1 replica after eliminating stale nodes, then choose all
// replicas for recovery and let the primary data node handle failures.
DatanodeInfo[] recoveryInfos;
if (recoveryLocations.size() > 1) {
if (recoveryLocations.size() != storages.length) {
LOG.info("Skipped stale nodes for recovery : " +
(storages.length - recoveryLocations.size()));
}
recoveryInfos =
DatanodeStorageInfo.toDatanodeInfos(recoveryLocations);
} else {
// If too many replicas are stale, then choose all replicas to participate
// in block recovery.
recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages);
}
RecoveringBlock rBlock;
if(truncateRecovery) {
Block recoveryBlock = (copyOnTruncateRecovery) ? b :
uc.getTruncateBlock();
rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
recoveryBlock);
} else {
rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
uc.getBlockRecoveryId());
}
brCommand.add(rBlock);
}
return new DatanodeCommand[] { brCommand };
} }
final List<DatanodeCommand> cmds = new ArrayList<>(); final List<DatanodeCommand> cmds = new ArrayList<>();
//check pending replication // check pending replication
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand( List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
maxTransfers); maxTransfers);
if (pendingList != null) { if (pendingList != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
pendingList)); pendingList));
} }
//check block invalidation // check block invalidation
Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
if (blks != null) { if (blks != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId,
blockPoolId, blks)); blks));
} }
boolean sendingCachingCommands = false; // cache commands
long nowMs = monotonicNow(); addCacheCommands(blockPoolId, nodeinfo, cmds);
if (shouldSendCachingCommands && // key update command
((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
timeBetweenResendingCachingDirectivesMs)) {
DatanodeCommand pendingCacheCommand =
getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
DatanodeProtocol.DNA_CACHE, blockPoolId);
if (pendingCacheCommand != null) {
cmds.add(pendingCacheCommand);
sendingCachingCommands = true;
}
DatanodeCommand pendingUncacheCommand =
getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
DatanodeProtocol.DNA_UNCACHE, blockPoolId);
if (pendingUncacheCommand != null) {
cmds.add(pendingUncacheCommand);
sendingCachingCommands = true;
}
if (sendingCachingCommands) {
nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
}
}
blockManager.addKeyUpdateCommand(cmds, nodeinfo); blockManager.addKeyUpdateCommand(cmds, nodeinfo);
// check for balancer bandwidth update // check for balancer bandwidth update
@ -1463,8 +1460,6 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
if (!cmds.isEmpty()) { if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]); return cmds.toArray(new DatanodeCommand[cmds.size()]);
} }
}
}
return new DatanodeCommand[0]; return new DatanodeCommand[0];
} }
@ -1474,14 +1469,13 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
* *
* @param list The {@link CachedBlocksList}. This function * @param list The {@link CachedBlocksList}. This function
* clears the list. * clears the list.
* @param datanode The datanode.
* @param action The action to perform in the command. * @param action The action to perform in the command.
* @param poolId The block pool id. * @param poolId The block pool id.
* @return A DatanodeCommand to be sent back to the DN, or null if * @return A DatanodeCommand to be sent back to the DN, or null if
* there is nothing to be done. * there is nothing to be done.
*/ */
private DatanodeCommand getCacheCommand(CachedBlocksList list, private DatanodeCommand getCacheCommand(CachedBlocksList list, int action,
DatanodeDescriptor datanode, int action, String poolId) { String poolId) {
int length = list.size(); int length = list.size();
if (length == 0) { if (length == 0) {
return null; return null;
@ -1489,9 +1483,7 @@ private DatanodeCommand getCacheCommand(CachedBlocksList list,
// Read the existing cache commands. // Read the existing cache commands.
long[] blockIds = new long[length]; long[] blockIds = new long[length];
int i = 0; int i = 0;
for (Iterator<CachedBlock> iter = list.iterator(); for (CachedBlock cachedBlock : list) {
iter.hasNext(); ) {
CachedBlock cachedBlock = iter.next();
blockIds[i++] = cachedBlock.getBlockId(); blockIds[i++] = cachedBlock.getBlockId();
} }
return new BlockIdCommand(action, poolId, blockIds); return new BlockIdCommand(action, poolId, blockIds);
@ -1512,7 +1504,7 @@ private DatanodeCommand getCacheCommand(CachedBlocksList list,
* @throws IOException * @throws IOException
*/ */
public void setBalancerBandwidth(long bandwidth) throws IOException { public void setBalancerBandwidth(long bandwidth) throws IOException {
synchronized(datanodeMap) { synchronized(this) {
for (DatanodeDescriptor nodeInfo : datanodeMap.values()) { for (DatanodeDescriptor nodeInfo : datanodeMap.values()) {
nodeInfo.setBalancerBandwidth(bandwidth); nodeInfo.setBalancerBandwidth(bandwidth);
} }
@ -1521,7 +1513,7 @@ public void setBalancerBandwidth(long bandwidth) throws IOException {
public void markAllDatanodesStale() { public void markAllDatanodesStale() {
LOG.info("Marking all datandoes as stale"); LOG.info("Marking all datandoes as stale");
synchronized (datanodeMap) { synchronized (this) {
for (DatanodeDescriptor dn : datanodeMap.values()) { for (DatanodeDescriptor dn : datanodeMap.values()) {
for(DatanodeStorageInfo storage : dn.getStorageInfos()) { for(DatanodeStorageInfo storage : dn.getStorageInfos()) {
storage.markStaleAfterFailover(); storage.markStaleAfterFailover();
@ -1536,7 +1528,7 @@ public void markAllDatanodesStale() {
* recoveries, and replication requests. * recoveries, and replication requests.
*/ */
public void clearPendingQueues() { public void clearPendingQueues() {
synchronized (datanodeMap) { synchronized (this) {
for (DatanodeDescriptor dn : datanodeMap.values()) { for (DatanodeDescriptor dn : datanodeMap.values()) {
dn.clearBlockQueues(); dn.clearBlockQueues();
} }
@ -1548,7 +1540,7 @@ public void clearPendingQueues() {
* know about. * know about.
*/ */
public void resetLastCachingDirectiveSentTime() { public void resetLastCachingDirectiveSentTime() {
synchronized (datanodeMap) { synchronized (this) {
for (DatanodeDescriptor dn : datanodeMap.values()) { for (DatanodeDescriptor dn : datanodeMap.values()) {
dn.setLastCachingDirectiveSentTimeMs(0L); dn.setLastCachingDirectiveSentTimeMs(0L);
} }
@ -1561,11 +1553,13 @@ public String toString() {
} }
public void clearPendingCachingCommands() { public void clearPendingCachingCommands() {
synchronized (this) {
for (DatanodeDescriptor dn : datanodeMap.values()) { for (DatanodeDescriptor dn : datanodeMap.values()) {
dn.getPendingCached().clear(); dn.getPendingCached().clear();
dn.getPendingUncached().clear(); dn.getPendingUncached().clear();
} }
} }
}
public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) { public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) {
this.shouldSendCachingCommands = shouldSendCachingCommands; this.shouldSendCachingCommands = shouldSendCachingCommands;

View File

@ -27,6 +27,8 @@
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;

View File

@ -402,9 +402,6 @@ private void logAuditEvent(boolean succeeded,
*/ */
private volatile boolean needRollbackFsImage; private volatile boolean needRollbackFsImage;
// Block pool ID used by this namenode
private String blockPoolId;
final LeaseManager leaseManager = new LeaseManager(this); final LeaseManager leaseManager = new LeaseManager(this);
Daemon nnrmthread = null; // NamenodeResourceMonitor thread Daemon nnrmthread = null; // NamenodeResourceMonitor thread
@ -2323,12 +2320,11 @@ LastBlockWithStatus appendFile(String srcArg, String holder,
} }
ExtendedBlock getExtendedBlock(Block blk) { ExtendedBlock getExtendedBlock(Block blk) {
return new ExtendedBlock(blockPoolId, blk); return new ExtendedBlock(getBlockPoolId(), blk);
} }
void setBlockPoolId(String bpid) { void setBlockPoolId(String bpid) {
blockPoolId = bpid; blockManager.setBlockPoolId(bpid);
blockManager.setBlockPoolId(blockPoolId);
} }
/** /**
@ -3454,7 +3450,7 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
final int maxTransfer = blockManager.getMaxReplicationStreams() final int maxTransfer = blockManager.getMaxReplicationStreams()
- xmitsInProgress; - xmitsInProgress;
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat( DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed, nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary); xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
long blockReportLeaseId = 0; long blockReportLeaseId = 0;
if (requestFullBlockReportLease) { if (requestFullBlockReportLease) {
@ -5313,7 +5309,7 @@ public String getClusterId() {
@Override // NameNodeMXBean @Override // NameNodeMXBean
public String getBlockPoolId() { public String getBlockPoolId() {
return blockPoolId; return getBlockManager().getBlockPoolId();
} }
@Override // NameNodeMXBean @Override // NameNodeMXBean
@ -5902,7 +5898,7 @@ private void startRollingUpgradeInternalForNonHA(long startTime)
} }
void setRollingUpgradeInfo(boolean createdRollbackImages, long startTime) { void setRollingUpgradeInfo(boolean createdRollbackImages, long startTime) {
rollingUpgradeInfo = new RollingUpgradeInfo(blockPoolId, rollingUpgradeInfo = new RollingUpgradeInfo(getBlockPoolId(),
createdRollbackImages, startTime, 0L); createdRollbackImages, startTime, 0L);
} }