HDFS-8653. Code cleanup for DatanodeManager, DatanodeDescriptor and DatanodeStorageInfo. Contributed by Zhe Zhang.
(cherry picked from commit 2ffd84273a
)
This commit is contained in:
parent
e40e9fc7f4
commit
ada7f66e12
|
@ -346,6 +346,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-8623. Refactor NameNode handling of invalid, corrupt, and under-recovery
|
HDFS-8623. Refactor NameNode handling of invalid, corrupt, and under-recovery
|
||||||
blocks. (Zhe Zhang via jing9)
|
blocks. (Zhe Zhang via jing9)
|
||||||
|
|
||||||
|
HDFS-8653. Code cleanup for DatanodeManager, DatanodeDescriptor and
|
||||||
|
DatanodeStorageInfo. (Zhe Zhang via wang)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -66,7 +66,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
|
|
||||||
// Stores status of decommissioning.
|
// Stores status of decommissioning.
|
||||||
// If node is not decommissioning, do not use this object for anything.
|
// If node is not decommissioning, do not use this object for anything.
|
||||||
public final DecommissioningStatus decommissioningStatus = new DecommissioningStatus();
|
public final DecommissioningStatus decommissioningStatus =
|
||||||
|
new DecommissioningStatus();
|
||||||
|
|
||||||
private long curBlockReportId = 0;
|
private long curBlockReportId = 0;
|
||||||
|
|
||||||
|
@ -117,7 +118,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
List<E> results = new ArrayList<E>();
|
List<E> results = new ArrayList<>();
|
||||||
for(; !blockq.isEmpty() && numBlocks > 0; numBlocks--) {
|
for(; !blockq.isEmpty() && numBlocks > 0; numBlocks--) {
|
||||||
results.add(blockq.poll());
|
results.add(blockq.poll());
|
||||||
}
|
}
|
||||||
|
@ -137,7 +138,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
private final Map<String, DatanodeStorageInfo> storageMap =
|
private final Map<String, DatanodeStorageInfo> storageMap =
|
||||||
new HashMap<String, DatanodeStorageInfo>();
|
new HashMap<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A list of CachedBlock objects on this datanode.
|
* A list of CachedBlock objects on this datanode.
|
||||||
|
@ -219,12 +220,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
private long bandwidth;
|
private long bandwidth;
|
||||||
|
|
||||||
/** A queue of blocks to be replicated by this datanode */
|
/** A queue of blocks to be replicated by this datanode */
|
||||||
private final BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<BlockTargetPair>();
|
private final BlockQueue<BlockTargetPair> replicateBlocks =
|
||||||
|
new BlockQueue<>();
|
||||||
/** A queue of blocks to be recovered by this datanode */
|
/** A queue of blocks to be recovered by this datanode */
|
||||||
private final BlockQueue<BlockInfoUnderConstruction> recoverBlocks =
|
private final BlockQueue<BlockInfoUnderConstruction> recoverBlocks =
|
||||||
new BlockQueue<BlockInfoUnderConstruction>();
|
new BlockQueue<>();
|
||||||
/** A set of blocks to be invalidated by this datanode */
|
/** A set of blocks to be invalidated by this datanode */
|
||||||
private final LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<Block>();
|
private final LightWeightHashSet<Block> invalidateBlocks =
|
||||||
|
new LightWeightHashSet<>();
|
||||||
|
|
||||||
/* Variables for maintaining number of blocks scheduled to be written to
|
/* Variables for maintaining number of blocks scheduled to be written to
|
||||||
* this storage. This count is approximate and might be slightly bigger
|
* this storage. This count is approximate and might be slightly bigger
|
||||||
|
@ -232,9 +235,9 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
* while writing the block).
|
* while writing the block).
|
||||||
*/
|
*/
|
||||||
private EnumCounters<StorageType> currApproxBlocksScheduled
|
private EnumCounters<StorageType> currApproxBlocksScheduled
|
||||||
= new EnumCounters<StorageType>(StorageType.class);
|
= new EnumCounters<>(StorageType.class);
|
||||||
private EnumCounters<StorageType> prevApproxBlocksScheduled
|
private EnumCounters<StorageType> prevApproxBlocksScheduled
|
||||||
= new EnumCounters<StorageType>(StorageType.class);
|
= new EnumCounters<>(StorageType.class);
|
||||||
private long lastBlocksScheduledRollTime = 0;
|
private long lastBlocksScheduledRollTime = 0;
|
||||||
private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
|
private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
|
||||||
private int volumeFailures = 0;
|
private int volumeFailures = 0;
|
||||||
|
@ -278,6 +281,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
return storageMap.get(storageID);
|
return storageMap.get(storageID);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
DatanodeStorageInfo[] getStorageInfos() {
|
DatanodeStorageInfo[] getStorageInfos() {
|
||||||
synchronized (storageMap) {
|
synchronized (storageMap) {
|
||||||
final Collection<DatanodeStorageInfo> storages = storageMap.values();
|
final Collection<DatanodeStorageInfo> storages = storageMap.values();
|
||||||
|
@ -323,7 +327,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
Long.toHexString(curBlockReportId));
|
Long.toHexString(curBlockReportId));
|
||||||
iter.remove();
|
iter.remove();
|
||||||
if (zombies == null) {
|
if (zombies == null) {
|
||||||
zombies = new LinkedList<DatanodeStorageInfo>();
|
zombies = new LinkedList<>();
|
||||||
}
|
}
|
||||||
zombies.add(storageInfo);
|
zombies.add(storageInfo);
|
||||||
}
|
}
|
||||||
|
@ -352,10 +356,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
*/
|
*/
|
||||||
boolean removeBlock(String storageID, BlockInfo b) {
|
boolean removeBlock(String storageID, BlockInfo b) {
|
||||||
DatanodeStorageInfo s = getStorageInfo(storageID);
|
DatanodeStorageInfo s = getStorageInfo(storageID);
|
||||||
if (s != null) {
|
return s != null && s.removeBlock(b);
|
||||||
return s.removeBlock(b);
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void resetBlocks() {
|
public void resetBlocks() {
|
||||||
|
@ -451,7 +452,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
+ this.volumeFailures + " to " + volFailures);
|
+ this.volumeFailures + " to " + volFailures);
|
||||||
synchronized (storageMap) {
|
synchronized (storageMap) {
|
||||||
failedStorageInfos =
|
failedStorageInfos =
|
||||||
new HashSet<DatanodeStorageInfo>(storageMap.values());
|
new HashSet<>(storageMap.values());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -507,7 +508,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
HashMap<String, DatanodeStorageInfo> excessStorages;
|
HashMap<String, DatanodeStorageInfo> excessStorages;
|
||||||
|
|
||||||
// Init excessStorages with all known storages.
|
// Init excessStorages with all known storages.
|
||||||
excessStorages = new HashMap<String, DatanodeStorageInfo>(storageMap);
|
excessStorages = new HashMap<>(storageMap);
|
||||||
|
|
||||||
// Remove storages that the DN reported in the heartbeat.
|
// Remove storages that the DN reported in the heartbeat.
|
||||||
for (final StorageReport report : reports) {
|
for (final StorageReport report : reports) {
|
||||||
|
@ -544,7 +545,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
private final List<Iterator<BlockInfo>> iterators;
|
private final List<Iterator<BlockInfo>> iterators;
|
||||||
|
|
||||||
private BlockIterator(final DatanodeStorageInfo... storages) {
|
private BlockIterator(final DatanodeStorageInfo... storages) {
|
||||||
List<Iterator<BlockInfo>> iterators = new ArrayList<Iterator<BlockInfo>>();
|
List<Iterator<BlockInfo>> iterators = new ArrayList<>();
|
||||||
for (DatanodeStorageInfo e : storages) {
|
for (DatanodeStorageInfo e : storages) {
|
||||||
iterators.add(e.getBlockIterator());
|
iterators.add(e.getBlockIterator());
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,7 +85,7 @@ public class DatanodeManager {
|
||||||
* Mapping: StorageID -> DatanodeDescriptor
|
* Mapping: StorageID -> DatanodeDescriptor
|
||||||
*/
|
*/
|
||||||
private final Map<String, DatanodeDescriptor> datanodeMap
|
private final Map<String, DatanodeDescriptor> datanodeMap
|
||||||
= new HashMap<String, DatanodeDescriptor>();
|
= new HashMap<>();
|
||||||
|
|
||||||
/** Cluster network topology */
|
/** Cluster network topology */
|
||||||
private final NetworkTopology networktopology;
|
private final NetworkTopology networktopology;
|
||||||
|
@ -162,7 +162,7 @@ public class DatanodeManager {
|
||||||
* Software version -> Number of datanodes with this version
|
* Software version -> Number of datanodes with this version
|
||||||
*/
|
*/
|
||||||
private HashMap<String, Integer> datanodesSoftwareVersions =
|
private HashMap<String, Integer> datanodesSoftwareVersions =
|
||||||
new HashMap<String, Integer>(4, 0.75f);
|
new HashMap<>(4, 0.75f);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The minimum time between resending caching directives to Datanodes,
|
* The minimum time between resending caching directives to Datanodes,
|
||||||
|
@ -217,7 +217,7 @@ public class DatanodeManager {
|
||||||
// locations of those hosts in the include list and store the mapping
|
// locations of those hosts in the include list and store the mapping
|
||||||
// in the cache; so future calls to resolve will be fast.
|
// in the cache; so future calls to resolve will be fast.
|
||||||
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
|
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
|
||||||
final ArrayList<String> locations = new ArrayList<String>();
|
final ArrayList<String> locations = new ArrayList<>();
|
||||||
for (InetSocketAddress addr : hostFileManager.getIncludes()) {
|
for (InetSocketAddress addr : hostFileManager.getIncludes()) {
|
||||||
locations.add(addr.getAddress().getHostAddress());
|
locations.add(addr.getAddress().getHostAddress());
|
||||||
}
|
}
|
||||||
|
@ -370,7 +370,7 @@ public class DatanodeManager {
|
||||||
// here we should get node but not datanode only .
|
// here we should get node but not datanode only .
|
||||||
Node client = getDatanodeByHost(targethost);
|
Node client = getDatanodeByHost(targethost);
|
||||||
if (client == null) {
|
if (client == null) {
|
||||||
List<String> hosts = new ArrayList<String> (1);
|
List<String> hosts = new ArrayList<> (1);
|
||||||
hosts.add(targethost);
|
hosts.add(targethost);
|
||||||
List<String> resolvedHosts = dnsToSwitchMapping.resolve(hosts);
|
List<String> resolvedHosts = dnsToSwitchMapping.resolve(hosts);
|
||||||
if (resolvedHosts != null && !resolvedHosts.isEmpty()) {
|
if (resolvedHosts != null && !resolvedHosts.isEmpty()) {
|
||||||
|
@ -522,7 +522,7 @@ public class DatanodeManager {
|
||||||
void datanodeDump(final PrintWriter out) {
|
void datanodeDump(final PrintWriter out) {
|
||||||
synchronized (datanodeMap) {
|
synchronized (datanodeMap) {
|
||||||
Map<String,DatanodeDescriptor> sortedDatanodeMap =
|
Map<String,DatanodeDescriptor> sortedDatanodeMap =
|
||||||
new TreeMap<String,DatanodeDescriptor>(datanodeMap);
|
new TreeMap<>(datanodeMap);
|
||||||
out.println("Metasave: Number of datanodes: " + datanodeMap.size());
|
out.println("Metasave: Number of datanodes: " + datanodeMap.size());
|
||||||
for (DatanodeDescriptor node : sortedDatanodeMap.values()) {
|
for (DatanodeDescriptor node : sortedDatanodeMap.values()) {
|
||||||
out.println(node.dumpDatanode());
|
out.println(node.dumpDatanode());
|
||||||
|
@ -660,7 +660,7 @@ public class DatanodeManager {
|
||||||
|
|
||||||
private void countSoftwareVersions() {
|
private void countSoftwareVersions() {
|
||||||
synchronized(datanodeMap) {
|
synchronized(datanodeMap) {
|
||||||
HashMap<String, Integer> versionCount = new HashMap<String, Integer>();
|
HashMap<String, Integer> versionCount = new HashMap<>();
|
||||||
for(DatanodeDescriptor dn: datanodeMap.values()) {
|
for(DatanodeDescriptor dn: datanodeMap.values()) {
|
||||||
// Check isAlive too because right after removeDatanode(),
|
// Check isAlive too because right after removeDatanode(),
|
||||||
// isDatanodeDead() is still true
|
// isDatanodeDead() is still true
|
||||||
|
@ -677,7 +677,7 @@ public class DatanodeManager {
|
||||||
|
|
||||||
public HashMap<String, Integer> getDatanodesSoftwareVersions() {
|
public HashMap<String, Integer> getDatanodesSoftwareVersions() {
|
||||||
synchronized(datanodeMap) {
|
synchronized(datanodeMap) {
|
||||||
return new HashMap<String, Integer> (this.datanodesSoftwareVersions);
|
return new HashMap<> (this.datanodesSoftwareVersions);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -710,7 +710,7 @@ public class DatanodeManager {
|
||||||
*/
|
*/
|
||||||
private String resolveNetworkLocation (DatanodeID node)
|
private String resolveNetworkLocation (DatanodeID node)
|
||||||
throws UnresolvedTopologyException {
|
throws UnresolvedTopologyException {
|
||||||
List<String> names = new ArrayList<String>(1);
|
List<String> names = new ArrayList<>(1);
|
||||||
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
|
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
|
||||||
names.add(node.getIpAddr());
|
names.add(node.getIpAddr());
|
||||||
} else {
|
} else {
|
||||||
|
@ -999,7 +999,7 @@ public class DatanodeManager {
|
||||||
// If the network location is invalid, clear the cached mappings
|
// If the network location is invalid, clear the cached mappings
|
||||||
// so that we have a chance to re-add this DataNode with the
|
// so that we have a chance to re-add this DataNode with the
|
||||||
// correct network location later.
|
// correct network location later.
|
||||||
List<String> invalidNodeNames = new ArrayList<String>(3);
|
List<String> invalidNodeNames = new ArrayList<>(3);
|
||||||
// clear cache for nodes in IP or Hostname
|
// clear cache for nodes in IP or Hostname
|
||||||
invalidNodeNames.add(nodeReg.getIpAddr());
|
invalidNodeNames.add(nodeReg.getIpAddr());
|
||||||
invalidNodeNames.add(nodeReg.getHostName());
|
invalidNodeNames.add(nodeReg.getHostName());
|
||||||
|
@ -1274,7 +1274,7 @@ public class DatanodeManager {
|
||||||
final HostFileManager.HostSet excludedNodes = hostFileManager.getExcludes();
|
final HostFileManager.HostSet excludedNodes = hostFileManager.getExcludes();
|
||||||
|
|
||||||
synchronized(datanodeMap) {
|
synchronized(datanodeMap) {
|
||||||
nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size());
|
nodes = new ArrayList<>(datanodeMap.size());
|
||||||
for (DatanodeDescriptor dn : datanodeMap.values()) {
|
for (DatanodeDescriptor dn : datanodeMap.values()) {
|
||||||
final boolean isDead = isDatanodeDead(dn);
|
final boolean isDead = isDatanodeDead(dn);
|
||||||
final boolean isDecommissioning = dn.isDecommissionInProgress();
|
final boolean isDecommissioning = dn.isDecommissionInProgress();
|
||||||
|
@ -1350,7 +1350,7 @@ public class DatanodeManager {
|
||||||
VolumeFailureSummary volumeFailureSummary) throws IOException {
|
VolumeFailureSummary volumeFailureSummary) throws IOException {
|
||||||
synchronized (heartbeatManager) {
|
synchronized (heartbeatManager) {
|
||||||
synchronized (datanodeMap) {
|
synchronized (datanodeMap) {
|
||||||
DatanodeDescriptor nodeinfo = null;
|
DatanodeDescriptor nodeinfo;
|
||||||
try {
|
try {
|
||||||
nodeinfo = getDatanode(nodeReg);
|
nodeinfo = getDatanode(nodeReg);
|
||||||
} catch(UnregisteredNodeException e) {
|
} catch(UnregisteredNodeException e) {
|
||||||
|
@ -1388,7 +1388,7 @@ public class DatanodeManager {
|
||||||
final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations();
|
final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations();
|
||||||
// Skip stale nodes during recovery - not heart beated for some time (30s by default).
|
// Skip stale nodes during recovery - not heart beated for some time (30s by default).
|
||||||
final List<DatanodeStorageInfo> recoveryLocations =
|
final List<DatanodeStorageInfo> recoveryLocations =
|
||||||
new ArrayList<DatanodeStorageInfo>(storages.length);
|
new ArrayList<>(storages.length);
|
||||||
for (int i = 0; i < storages.length; i++) {
|
for (int i = 0; i < storages.length; i++) {
|
||||||
if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) {
|
if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) {
|
||||||
recoveryLocations.add(storages[i]);
|
recoveryLocations.add(storages[i]);
|
||||||
|
@ -1430,7 +1430,7 @@ public class DatanodeManager {
|
||||||
return new DatanodeCommand[] { brCommand };
|
return new DatanodeCommand[] { brCommand };
|
||||||
}
|
}
|
||||||
|
|
||||||
final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
|
final List<DatanodeCommand> cmds = new ArrayList<>();
|
||||||
//check pending replication
|
//check pending replication
|
||||||
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
|
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
|
||||||
maxTransfers);
|
maxTransfers);
|
||||||
|
|
|
@ -37,8 +37,9 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
public class DatanodeStorageInfo {
|
public class DatanodeStorageInfo {
|
||||||
public static final DatanodeStorageInfo[] EMPTY_ARRAY = {};
|
public static final DatanodeStorageInfo[] EMPTY_ARRAY = {};
|
||||||
|
|
||||||
public static DatanodeInfo[] toDatanodeInfos(DatanodeStorageInfo[] storages) {
|
public static DatanodeInfo[] toDatanodeInfos(
|
||||||
return toDatanodeInfos(Arrays.asList(storages));
|
DatanodeStorageInfo[] storages) {
|
||||||
|
return storages == null ? null: toDatanodeInfos(Arrays.asList(storages));
|
||||||
}
|
}
|
||||||
static DatanodeInfo[] toDatanodeInfos(List<DatanodeStorageInfo> storages) {
|
static DatanodeInfo[] toDatanodeInfos(List<DatanodeStorageInfo> storages) {
|
||||||
final DatanodeInfo[] datanodes = new DatanodeInfo[storages.size()];
|
final DatanodeInfo[] datanodes = new DatanodeInfo[storages.size()];
|
||||||
|
@ -58,6 +59,9 @@ public class DatanodeStorageInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String[] toStorageIDs(DatanodeStorageInfo[] storages) {
|
public static String[] toStorageIDs(DatanodeStorageInfo[] storages) {
|
||||||
|
if (storages == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
String[] storageIDs = new String[storages.length];
|
String[] storageIDs = new String[storages.length];
|
||||||
for(int i = 0; i < storageIDs.length; i++) {
|
for(int i = 0; i < storageIDs.length; i++) {
|
||||||
storageIDs[i] = storages[i].getStorageID();
|
storageIDs[i] = storages[i].getStorageID();
|
||||||
|
@ -66,6 +70,9 @@ public class DatanodeStorageInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static StorageType[] toStorageTypes(DatanodeStorageInfo[] storages) {
|
public static StorageType[] toStorageTypes(DatanodeStorageInfo[] storages) {
|
||||||
|
if (storages == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
StorageType[] storageTypes = new StorageType[storages.length];
|
StorageType[] storageTypes = new StorageType[storages.length];
|
||||||
for(int i = 0; i < storageTypes.length; i++) {
|
for(int i = 0; i < storageTypes.length; i++) {
|
||||||
storageTypes[i] = storages[i].getStorageType();
|
storageTypes[i] = storages[i].getStorageType();
|
||||||
|
@ -380,6 +387,6 @@ public class DatanodeStorageInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
static enum AddBlockResult {
|
static enum AddBlockResult {
|
||||||
ADDED, REPLACED, ALREADY_EXIST;
|
ADDED, REPLACED, ALREADY_EXIST
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue