HDFS-8653. Code cleanup for DatanodeManager, DatanodeDescriptor and DatanodeStorageInfo. Contributed by Zhe Zhang.

This commit is contained in:
Andrew Wang 2015-06-29 12:12:34 -07:00
parent d3fed8e653
commit 2ffd84273a
5 changed files with 45 additions and 34 deletions

View File

@ -682,6 +682,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8623. Refactor NameNode handling of invalid, corrupt, and under-recovery HDFS-8623. Refactor NameNode handling of invalid, corrupt, and under-recovery
blocks. (Zhe Zhang via jing9) blocks. (Zhe Zhang via jing9)
HDFS-8653. Code cleanup for DatanodeManager, DatanodeDescriptor and
DatanodeStorageInfo. (Zhe Zhang via wang)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -64,7 +64,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
// Stores status of decommissioning. // Stores status of decommissioning.
// If node is not decommissioning, do not use this object for anything. // If node is not decommissioning, do not use this object for anything.
public final DecommissioningStatus decommissioningStatus = new DecommissioningStatus(); public final DecommissioningStatus decommissioningStatus =
new DecommissioningStatus();
private long curBlockReportId = 0; private long curBlockReportId = 0;
@ -115,7 +116,7 @@ synchronized List<E> poll(int numBlocks) {
return null; return null;
} }
List<E> results = new ArrayList<E>(); List<E> results = new ArrayList<>();
for(; !blockq.isEmpty() && numBlocks > 0; numBlocks--) { for(; !blockq.isEmpty() && numBlocks > 0; numBlocks--) {
results.add(blockq.poll()); results.add(blockq.poll());
} }
@ -135,7 +136,7 @@ synchronized void clear() {
} }
private final Map<String, DatanodeStorageInfo> storageMap = private final Map<String, DatanodeStorageInfo> storageMap =
new HashMap<String, DatanodeStorageInfo>(); new HashMap<>();
/** /**
* A list of CachedBlock objects on this datanode. * A list of CachedBlock objects on this datanode.
@ -217,12 +218,14 @@ public CachedBlocksList getPendingUncached() {
private long bandwidth; private long bandwidth;
/** A queue of blocks to be replicated by this datanode */ /** A queue of blocks to be replicated by this datanode */
private final BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<BlockTargetPair>(); private final BlockQueue<BlockTargetPair> replicateBlocks =
new BlockQueue<>();
/** A queue of blocks to be recovered by this datanode */ /** A queue of blocks to be recovered by this datanode */
private final BlockQueue<BlockInfoUnderConstruction> recoverBlocks = private final BlockQueue<BlockInfoUnderConstruction> recoverBlocks =
new BlockQueue<BlockInfoUnderConstruction>(); new BlockQueue<>();
/** A set of blocks to be invalidated by this datanode */ /** A set of blocks to be invalidated by this datanode */
private final LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<Block>(); private final LightWeightHashSet<Block> invalidateBlocks =
new LightWeightHashSet<>();
/* Variables for maintaining number of blocks scheduled to be written to /* Variables for maintaining number of blocks scheduled to be written to
* this storage. This count is approximate and might be slightly bigger * this storage. This count is approximate and might be slightly bigger
@ -230,9 +233,9 @@ public CachedBlocksList getPendingUncached() {
* while writing the block). * while writing the block).
*/ */
private EnumCounters<StorageType> currApproxBlocksScheduled private EnumCounters<StorageType> currApproxBlocksScheduled
= new EnumCounters<StorageType>(StorageType.class); = new EnumCounters<>(StorageType.class);
private EnumCounters<StorageType> prevApproxBlocksScheduled private EnumCounters<StorageType> prevApproxBlocksScheduled
= new EnumCounters<StorageType>(StorageType.class); = new EnumCounters<>(StorageType.class);
private long lastBlocksScheduledRollTime = 0; private long lastBlocksScheduledRollTime = 0;
private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
private int volumeFailures = 0; private int volumeFailures = 0;
@ -276,6 +279,7 @@ public DatanodeStorageInfo getStorageInfo(String storageID) {
return storageMap.get(storageID); return storageMap.get(storageID);
} }
} }
DatanodeStorageInfo[] getStorageInfos() { DatanodeStorageInfo[] getStorageInfos() {
synchronized (storageMap) { synchronized (storageMap) {
final Collection<DatanodeStorageInfo> storages = storageMap.values(); final Collection<DatanodeStorageInfo> storages = storageMap.values();
@ -321,7 +325,7 @@ List<DatanodeStorageInfo> removeZombieStorages() {
Long.toHexString(curBlockReportId)); Long.toHexString(curBlockReportId));
iter.remove(); iter.remove();
if (zombies == null) { if (zombies == null) {
zombies = new LinkedList<DatanodeStorageInfo>(); zombies = new LinkedList<>();
} }
zombies.add(storageInfo); zombies.add(storageInfo);
} }
@ -350,10 +354,7 @@ boolean removeBlock(BlockInfo b) {
*/ */
boolean removeBlock(String storageID, BlockInfo b) { boolean removeBlock(String storageID, BlockInfo b) {
DatanodeStorageInfo s = getStorageInfo(storageID); DatanodeStorageInfo s = getStorageInfo(storageID);
if (s != null) { return s != null && s.removeBlock(b);
return s.removeBlock(b);
}
return false;
} }
public void resetBlocks() { public void resetBlocks() {
@ -449,7 +450,7 @@ public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
+ this.volumeFailures + " to " + volFailures); + this.volumeFailures + " to " + volFailures);
synchronized (storageMap) { synchronized (storageMap) {
failedStorageInfos = failedStorageInfos =
new HashSet<DatanodeStorageInfo>(storageMap.values()); new HashSet<>(storageMap.values());
} }
} }
@ -505,7 +506,7 @@ private void pruneStorageMap(final StorageReport[] reports) {
HashMap<String, DatanodeStorageInfo> excessStorages; HashMap<String, DatanodeStorageInfo> excessStorages;
// Init excessStorages with all known storages. // Init excessStorages with all known storages.
excessStorages = new HashMap<String, DatanodeStorageInfo>(storageMap); excessStorages = new HashMap<>(storageMap);
// Remove storages that the DN reported in the heartbeat. // Remove storages that the DN reported in the heartbeat.
for (final StorageReport report : reports) { for (final StorageReport report : reports) {
@ -542,7 +543,7 @@ private static class BlockIterator implements Iterator<BlockInfo> {
private final List<Iterator<BlockInfo>> iterators; private final List<Iterator<BlockInfo>> iterators;
private BlockIterator(final DatanodeStorageInfo... storages) { private BlockIterator(final DatanodeStorageInfo... storages) {
List<Iterator<BlockInfo>> iterators = new ArrayList<Iterator<BlockInfo>>(); List<Iterator<BlockInfo>> iterators = new ArrayList<>();
for (DatanodeStorageInfo e : storages) { for (DatanodeStorageInfo e : storages) {
iterators.add(e.getBlockIterator()); iterators.add(e.getBlockIterator());
} }

View File

@ -85,7 +85,7 @@ public class DatanodeManager {
* Mapping: StorageID -> DatanodeDescriptor * Mapping: StorageID -> DatanodeDescriptor
*/ */
private final Map<String, DatanodeDescriptor> datanodeMap private final Map<String, DatanodeDescriptor> datanodeMap
= new HashMap<String, DatanodeDescriptor>(); = new HashMap<>();
/** Cluster network topology */ /** Cluster network topology */
private final NetworkTopology networktopology; private final NetworkTopology networktopology;
@ -162,7 +162,7 @@ public class DatanodeManager {
* Software version -> Number of datanodes with this version * Software version -> Number of datanodes with this version
*/ */
private HashMap<String, Integer> datanodesSoftwareVersions = private HashMap<String, Integer> datanodesSoftwareVersions =
new HashMap<String, Integer>(4, 0.75f); new HashMap<>(4, 0.75f);
/** /**
* The minimum time between resending caching directives to Datanodes, * The minimum time between resending caching directives to Datanodes,
@ -217,7 +217,7 @@ public class DatanodeManager {
// locations of those hosts in the include list and store the mapping // locations of those hosts in the include list and store the mapping
// in the cache; so future calls to resolve will be fast. // in the cache; so future calls to resolve will be fast.
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) { if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
final ArrayList<String> locations = new ArrayList<String>(); final ArrayList<String> locations = new ArrayList<>();
for (InetSocketAddress addr : hostFileManager.getIncludes()) { for (InetSocketAddress addr : hostFileManager.getIncludes()) {
locations.add(addr.getAddress().getHostAddress()); locations.add(addr.getAddress().getHostAddress());
} }
@ -370,7 +370,7 @@ public void sortLocatedBlocks(final String targethost,
// here we should get node but not datanode only . // here we should get node but not datanode only .
Node client = getDatanodeByHost(targethost); Node client = getDatanodeByHost(targethost);
if (client == null) { if (client == null) {
List<String> hosts = new ArrayList<String> (1); List<String> hosts = new ArrayList<> (1);
hosts.add(targethost); hosts.add(targethost);
List<String> resolvedHosts = dnsToSwitchMapping.resolve(hosts); List<String> resolvedHosts = dnsToSwitchMapping.resolve(hosts);
if (resolvedHosts != null && !resolvedHosts.isEmpty()) { if (resolvedHosts != null && !resolvedHosts.isEmpty()) {
@ -522,7 +522,7 @@ public DatanodeStorageInfo[] getDatanodeStorageInfos(
void datanodeDump(final PrintWriter out) { void datanodeDump(final PrintWriter out) {
synchronized (datanodeMap) { synchronized (datanodeMap) {
Map<String,DatanodeDescriptor> sortedDatanodeMap = Map<String,DatanodeDescriptor> sortedDatanodeMap =
new TreeMap<String,DatanodeDescriptor>(datanodeMap); new TreeMap<>(datanodeMap);
out.println("Metasave: Number of datanodes: " + datanodeMap.size()); out.println("Metasave: Number of datanodes: " + datanodeMap.size());
for (DatanodeDescriptor node : sortedDatanodeMap.values()) { for (DatanodeDescriptor node : sortedDatanodeMap.values()) {
out.println(node.dumpDatanode()); out.println(node.dumpDatanode());
@ -660,7 +660,7 @@ private boolean shouldCountVersion(DatanodeDescriptor node) {
private void countSoftwareVersions() { private void countSoftwareVersions() {
synchronized(datanodeMap) { synchronized(datanodeMap) {
HashMap<String, Integer> versionCount = new HashMap<String, Integer>(); HashMap<String, Integer> versionCount = new HashMap<>();
for(DatanodeDescriptor dn: datanodeMap.values()) { for(DatanodeDescriptor dn: datanodeMap.values()) {
// Check isAlive too because right after removeDatanode(), // Check isAlive too because right after removeDatanode(),
// isDatanodeDead() is still true // isDatanodeDead() is still true
@ -677,7 +677,7 @@ private void countSoftwareVersions() {
public HashMap<String, Integer> getDatanodesSoftwareVersions() { public HashMap<String, Integer> getDatanodesSoftwareVersions() {
synchronized(datanodeMap) { synchronized(datanodeMap) {
return new HashMap<String, Integer> (this.datanodesSoftwareVersions); return new HashMap<> (this.datanodesSoftwareVersions);
} }
} }
@ -710,7 +710,7 @@ private String resolveNetworkLocationWithFallBackToDefaultLocation (
*/ */
private String resolveNetworkLocation (DatanodeID node) private String resolveNetworkLocation (DatanodeID node)
throws UnresolvedTopologyException { throws UnresolvedTopologyException {
List<String> names = new ArrayList<String>(1); List<String> names = new ArrayList<>(1);
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) { if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
names.add(node.getIpAddr()); names.add(node.getIpAddr());
} else { } else {
@ -1000,7 +1000,7 @@ nodes with its data cleared (or user can just remove the StorageID
// If the network location is invalid, clear the cached mappings // If the network location is invalid, clear the cached mappings
// so that we have a chance to re-add this DataNode with the // so that we have a chance to re-add this DataNode with the
// correct network location later. // correct network location later.
List<String> invalidNodeNames = new ArrayList<String>(3); List<String> invalidNodeNames = new ArrayList<>(3);
// clear cache for nodes in IP or Hostname // clear cache for nodes in IP or Hostname
invalidNodeNames.add(nodeReg.getIpAddr()); invalidNodeNames.add(nodeReg.getIpAddr());
invalidNodeNames.add(nodeReg.getHostName()); invalidNodeNames.add(nodeReg.getHostName());
@ -1275,7 +1275,7 @@ public List<DatanodeDescriptor> getDatanodeListForReport(
final HostFileManager.HostSet excludedNodes = hostFileManager.getExcludes(); final HostFileManager.HostSet excludedNodes = hostFileManager.getExcludes();
synchronized(datanodeMap) { synchronized(datanodeMap) {
nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size()); nodes = new ArrayList<>(datanodeMap.size());
for (DatanodeDescriptor dn : datanodeMap.values()) { for (DatanodeDescriptor dn : datanodeMap.values()) {
final boolean isDead = isDatanodeDead(dn); final boolean isDead = isDatanodeDead(dn);
final boolean isDecommissioning = dn.isDecommissionInProgress(); final boolean isDecommissioning = dn.isDecommissionInProgress();
@ -1351,7 +1351,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
VolumeFailureSummary volumeFailureSummary) throws IOException { VolumeFailureSummary volumeFailureSummary) throws IOException {
synchronized (heartbeatManager) { synchronized (heartbeatManager) {
synchronized (datanodeMap) { synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null; DatanodeDescriptor nodeinfo;
try { try {
nodeinfo = getDatanode(nodeReg); nodeinfo = getDatanode(nodeReg);
} catch(UnregisteredNodeException e) { } catch(UnregisteredNodeException e) {
@ -1389,7 +1389,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations(); final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations();
// Skip stale nodes during recovery - not heart beated for some time (30s by default). // Skip stale nodes during recovery - not heart beated for some time (30s by default).
final List<DatanodeStorageInfo> recoveryLocations = final List<DatanodeStorageInfo> recoveryLocations =
new ArrayList<DatanodeStorageInfo>(storages.length); new ArrayList<>(storages.length);
for (int i = 0; i < storages.length; i++) { for (int i = 0; i < storages.length; i++) {
if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) { if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) {
recoveryLocations.add(storages[i]); recoveryLocations.add(storages[i]);
@ -1431,7 +1431,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
return new DatanodeCommand[] { brCommand }; return new DatanodeCommand[] { brCommand };
} }
final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(); final List<DatanodeCommand> cmds = new ArrayList<>();
//check pending replication //check pending replication
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand( List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
maxTransfers); maxTransfers);

View File

@ -37,8 +37,9 @@
public class DatanodeStorageInfo { public class DatanodeStorageInfo {
public static final DatanodeStorageInfo[] EMPTY_ARRAY = {}; public static final DatanodeStorageInfo[] EMPTY_ARRAY = {};
public static DatanodeInfo[] toDatanodeInfos(DatanodeStorageInfo[] storages) { public static DatanodeInfo[] toDatanodeInfos(
return toDatanodeInfos(Arrays.asList(storages)); DatanodeStorageInfo[] storages) {
return storages == null ? null: toDatanodeInfos(Arrays.asList(storages));
} }
static DatanodeInfo[] toDatanodeInfos(List<DatanodeStorageInfo> storages) { static DatanodeInfo[] toDatanodeInfos(List<DatanodeStorageInfo> storages) {
final DatanodeInfo[] datanodes = new DatanodeInfo[storages.size()]; final DatanodeInfo[] datanodes = new DatanodeInfo[storages.size()];
@ -58,6 +59,9 @@ static DatanodeDescriptor[] toDatanodeDescriptors(
} }
public static String[] toStorageIDs(DatanodeStorageInfo[] storages) { public static String[] toStorageIDs(DatanodeStorageInfo[] storages) {
if (storages == null) {
return null;
}
String[] storageIDs = new String[storages.length]; String[] storageIDs = new String[storages.length];
for(int i = 0; i < storageIDs.length; i++) { for(int i = 0; i < storageIDs.length; i++) {
storageIDs[i] = storages[i].getStorageID(); storageIDs[i] = storages[i].getStorageID();
@ -66,6 +70,9 @@ public static String[] toStorageIDs(DatanodeStorageInfo[] storages) {
} }
public static StorageType[] toStorageTypes(DatanodeStorageInfo[] storages) { public static StorageType[] toStorageTypes(DatanodeStorageInfo[] storages) {
if (storages == null) {
return null;
}
StorageType[] storageTypes = new StorageType[storages.length]; StorageType[] storageTypes = new StorageType[storages.length];
for(int i = 0; i < storageTypes.length; i++) { for(int i = 0; i < storageTypes.length; i++) {
storageTypes[i] = storages[i].getStorageType(); storageTypes[i] = storages[i].getStorageType();
@ -380,6 +387,6 @@ static DatanodeStorageInfo getDatanodeStorageInfo(
} }
static enum AddBlockResult { static enum AddBlockResult {
ADDED, REPLACED, ALREADY_EXIST; ADDED, REPLACED, ALREADY_EXIST
} }
} }