HDDS-1048. Remove SCMNodeStat from SCMNodeManager and use storage information from DatanodeInfo#StorageReportProto. Contributed by Nanda kumar.
This commit is contained in:
parent
e0ab1bdece
commit
fb8c997a68
|
@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.protocol.proto
|
|||
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
@ -38,7 +39,6 @@ public class DatanodeInfo extends DatanodeDetails {
|
|||
private volatile long lastHeartbeatTime;
|
||||
private long lastStatsUpdatedTime;
|
||||
|
||||
// If required we can dissect StorageReportProto and store the raw data
|
||||
private List<StorageReportProto> storageReports;
|
||||
|
||||
/**
|
||||
|
@ -48,8 +48,9 @@ public class DatanodeInfo extends DatanodeDetails {
|
|||
*/
|
||||
public DatanodeInfo(DatanodeDetails datanodeDetails) {
|
||||
super(datanodeDetails);
|
||||
lock = new ReentrantReadWriteLock();
|
||||
lastHeartbeatTime = Time.monotonicNow();
|
||||
this.lock = new ReentrantReadWriteLock();
|
||||
this.lastHeartbeatTime = Time.monotonicNow();
|
||||
this.storageReports = Collections.emptyList();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -58,7 +58,6 @@ public class DeadNodeHandler implements EventHandler<DatanodeDetails> {
|
|||
@Override
|
||||
public void onMessage(DatanodeDetails datanodeDetails,
|
||||
EventPublisher publisher) {
|
||||
nodeManager.processDeadNode(datanodeDetails.getUuid());
|
||||
|
||||
// TODO: check if there are any pipeline on this node and fire close
|
||||
// pipeline event
|
||||
|
|
|
@ -93,8 +93,7 @@ public interface NodeManager extends StorageContainerNodeProtocol,
|
|||
* Return a map of node stats.
|
||||
* @return a map of individual node stats (live/stale but not dead).
|
||||
*/
|
||||
// TODO: try to change the return type to Map<DatanodeDetails, SCMNodeStat>
|
||||
Map<UUID, SCMNodeStat> getNodeStats();
|
||||
Map<DatanodeDetails, SCMNodeStat> getNodeStats();
|
||||
|
||||
/**
|
||||
* Return the node stat of the specified datanode.
|
||||
|
@ -159,17 +158,11 @@ public interface NodeManager extends StorageContainerNodeProtocol,
|
|||
/**
|
||||
* Process node report.
|
||||
*
|
||||
* @param dnUuid
|
||||
* @param datanodeDetails
|
||||
* @param nodeReport
|
||||
*/
|
||||
void processNodeReport(DatanodeDetails dnUuid, NodeReportProto nodeReport);
|
||||
|
||||
/**
|
||||
* Process a dead node event in this Node Manager.
|
||||
*
|
||||
* @param dnUuid datanode uuid.
|
||||
*/
|
||||
void processDeadNode(UUID dnUuid);
|
||||
void processNodeReport(DatanodeDetails datanodeDetails,
|
||||
NodeReportProto nodeReport);
|
||||
|
||||
/**
|
||||
* Get list of SCMCommands in the Command Queue for a particular Datanode.
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.HddsServerUtil;
|
|||
import org.apache.hadoop.hdds.scm.container.ContainerID;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
|
||||
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.scm.node.states.*;
|
||||
import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
|
||||
|
@ -275,20 +274,6 @@ public class NodeStateManager implements Runnable, Closeable {
|
|||
return nodeStateMap.getNodeInfo(datanodeDetails.getUuid());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get information about the node.
|
||||
*
|
||||
* @param datanodeUUID datanode UUID
|
||||
*
|
||||
* @return DatanodeInfo
|
||||
*
|
||||
* @throws NodeNotFoundException if the node is not present
|
||||
*/
|
||||
public DatanodeInfo getNode(UUID datanodeUUID)
|
||||
throws NodeNotFoundException {
|
||||
return nodeStateMap.getNodeInfo(datanodeUUID);
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the last heartbeat time of the node.
|
||||
*
|
||||
|
@ -319,7 +304,7 @@ public class NodeStateManager implements Runnable, Closeable {
|
|||
*
|
||||
* @return list of healthy nodes
|
||||
*/
|
||||
public List<DatanodeDetails> getHealthyNodes() {
|
||||
public List<DatanodeInfo> getHealthyNodes() {
|
||||
return getNodes(NodeState.HEALTHY);
|
||||
}
|
||||
|
||||
|
@ -328,7 +313,7 @@ public class NodeStateManager implements Runnable, Closeable {
|
|||
*
|
||||
* @return list of stale nodes
|
||||
*/
|
||||
public List<DatanodeDetails> getStaleNodes() {
|
||||
public List<DatanodeInfo> getStaleNodes() {
|
||||
return getNodes(NodeState.STALE);
|
||||
}
|
||||
|
||||
|
@ -337,7 +322,7 @@ public class NodeStateManager implements Runnable, Closeable {
|
|||
*
|
||||
* @return list of dead nodes
|
||||
*/
|
||||
public List<DatanodeDetails> getDeadNodes() {
|
||||
public List<DatanodeInfo> getDeadNodes() {
|
||||
return getNodes(NodeState.DEAD);
|
||||
}
|
||||
|
||||
|
@ -348,12 +333,12 @@ public class NodeStateManager implements Runnable, Closeable {
|
|||
*
|
||||
* @return list of nodes
|
||||
*/
|
||||
public List<DatanodeDetails> getNodes(NodeState state) {
|
||||
List<DatanodeDetails> nodes = new ArrayList<>();
|
||||
public List<DatanodeInfo> getNodes(NodeState state) {
|
||||
List<DatanodeInfo> nodes = new ArrayList<>();
|
||||
nodeStateMap.getNodes(state).forEach(
|
||||
uuid -> {
|
||||
try {
|
||||
nodes.add(nodeStateMap.getNodeDetails(uuid));
|
||||
nodes.add(nodeStateMap.getNodeInfo(uuid));
|
||||
} catch (NodeNotFoundException e) {
|
||||
// This should not happen unless someone else other than
|
||||
// NodeStateManager is directly modifying NodeStateMap and removed
|
||||
|
@ -369,12 +354,12 @@ public class NodeStateManager implements Runnable, Closeable {
|
|||
*
|
||||
* @return all the managed nodes
|
||||
*/
|
||||
public List<DatanodeDetails> getAllNodes() {
|
||||
List<DatanodeDetails> nodes = new ArrayList<>();
|
||||
public List<DatanodeInfo> getAllNodes() {
|
||||
List<DatanodeInfo> nodes = new ArrayList<>();
|
||||
nodeStateMap.getAllNodes().forEach(
|
||||
uuid -> {
|
||||
try {
|
||||
nodes.add(nodeStateMap.getNodeDetails(uuid));
|
||||
nodes.add(nodeStateMap.getNodeInfo(uuid));
|
||||
} catch (NodeNotFoundException e) {
|
||||
// This should not happen unless someone else other than
|
||||
// NodeStateManager is directly modifying NodeStateMap and removed
|
||||
|
@ -441,38 +426,6 @@ public class NodeStateManager implements Runnable, Closeable {
|
|||
return nodeStateMap.getTotalNodeCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current stats of the node.
|
||||
*
|
||||
* @param uuid node id
|
||||
*
|
||||
* @return SCMNodeStat
|
||||
*
|
||||
* @throws NodeNotFoundException if the node is not present
|
||||
*/
|
||||
public SCMNodeStat getNodeStat(UUID uuid) throws NodeNotFoundException {
|
||||
return nodeStateMap.getNodeStat(uuid);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a unmodifiable copy of nodeStats.
|
||||
* @return map with node stats.
|
||||
*/
|
||||
public Map<UUID, SCMNodeStat> getNodeStatsMap() {
|
||||
return nodeStateMap.getNodeStats();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the stat for the node.
|
||||
*
|
||||
* @param uuid node id.
|
||||
*
|
||||
* @param newstat new stat that will set to the specify node.
|
||||
*/
|
||||
public void setNodeStat(UUID uuid, SCMNodeStat newstat) {
|
||||
nodeStateMap.setNodeStat(uuid, newstat);
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a pipeline from the node2PipelineMap.
|
||||
* @param pipeline - Pipeline to be removed
|
||||
|
|
|
@ -19,7 +19,8 @@ package org.apache.hadoop.hdds.scm.node;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerID;
|
||||
|
@ -65,6 +66,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Maintains information about the Datanodes on SCM side.
|
||||
|
@ -80,18 +82,13 @@ import java.util.UUID;
|
|||
* get functions in this file as a snap-shot of information that is inconsistent
|
||||
* as soon as you read it.
|
||||
*/
|
||||
public class SCMNodeManager
|
||||
implements NodeManager, StorageContainerNodeProtocol {
|
||||
public class SCMNodeManager implements NodeManager {
|
||||
|
||||
@VisibleForTesting
|
||||
static final Logger LOG =
|
||||
LoggerFactory.getLogger(SCMNodeManager.class);
|
||||
|
||||
private final NodeStateManager nodeStateManager;
|
||||
// Should we maintain aggregated stats? If this is not frequently used, we
|
||||
// can always calculate it from nodeStats whenever required.
|
||||
// Aggregated node stats
|
||||
private SCMNodeStat scmStat;
|
||||
private final String clusterID;
|
||||
private final VersionInfo version;
|
||||
private final CommandQueue commandQueue;
|
||||
|
@ -108,7 +105,6 @@ public class SCMNodeManager
|
|||
StorageContainerManager scmManager, EventPublisher eventPublisher)
|
||||
throws IOException {
|
||||
this.nodeStateManager = new NodeStateManager(conf, eventPublisher);
|
||||
this.scmStat = new SCMNodeStat();
|
||||
this.clusterID = clusterID;
|
||||
this.version = VersionInfo.getLatestVersion();
|
||||
this.commandQueue = new CommandQueue();
|
||||
|
@ -131,7 +127,7 @@ public class SCMNodeManager
|
|||
|
||||
|
||||
/**
|
||||
* Gets all datanodes that are in a certain state. This function works by
|
||||
* Returns all datanode that are in the given state. This function works by
|
||||
* taking a snapshot of the current collection and then returning the list
|
||||
* from that collection. This means that real map might have changed by the
|
||||
* time we return this list.
|
||||
|
@ -140,7 +136,8 @@ public class SCMNodeManager
|
|||
*/
|
||||
@Override
|
||||
public List<DatanodeDetails> getNodes(NodeState nodestate) {
|
||||
return nodeStateManager.getNodes(nodestate);
|
||||
return nodeStateManager.getNodes(nodestate).stream()
|
||||
.map(node -> (DatanodeDetails)node).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -150,13 +147,14 @@ public class SCMNodeManager
|
|||
*/
|
||||
@Override
|
||||
public List<DatanodeDetails> getAllNodes() {
|
||||
return nodeStateManager.getAllNodes();
|
||||
return nodeStateManager.getAllNodes().stream()
|
||||
.map(node -> (DatanodeDetails)node).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Number of Datanodes by State they are in.
|
||||
*
|
||||
* @return int -- count
|
||||
* @return count
|
||||
*/
|
||||
@Override
|
||||
public int getNodeCount(NodeState nodestate) {
|
||||
|
@ -166,7 +164,7 @@ public class SCMNodeManager
|
|||
/**
|
||||
* Returns the node state of a specific node.
|
||||
*
|
||||
* @param datanodeDetails - Datanode Details
|
||||
* @param datanodeDetails Datanode Details
|
||||
* @return Healthy/Stale/Dead/Unknown.
|
||||
*/
|
||||
@Override
|
||||
|
@ -179,47 +177,6 @@ public class SCMNodeManager
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private void updateNodeStat(UUID dnId, NodeReportProto nodeReport) {
|
||||
SCMNodeStat stat;
|
||||
try {
|
||||
stat = nodeStateManager.getNodeStat(dnId);
|
||||
|
||||
// Updating the storage report for the datanode.
|
||||
// I dont think we will get NotFound exception, as we are taking
|
||||
// nodeInfo from nodeStateMap, as I see it is not being removed from
|
||||
// the map, just we change the states. And during first time
|
||||
// registration we call this, after adding to nodeStateMap. And also
|
||||
// from eventhandler it is called only if it has node Report.
|
||||
DatanodeInfo datanodeInfo = nodeStateManager.getNode(dnId);
|
||||
if (nodeReport != null) {
|
||||
datanodeInfo.updateStorageReports(nodeReport.getStorageReportList());
|
||||
}
|
||||
|
||||
} catch (NodeNotFoundException e) {
|
||||
LOG.debug("SCM updateNodeStat based on heartbeat from previous " +
|
||||
"dead datanode {}", dnId);
|
||||
stat = new SCMNodeStat();
|
||||
}
|
||||
|
||||
if (nodeReport != null && nodeReport.getStorageReportCount() > 0) {
|
||||
long totalCapacity = 0;
|
||||
long totalRemaining = 0;
|
||||
long totalScmUsed = 0;
|
||||
List<StorageReportProto> storageReports = nodeReport
|
||||
.getStorageReportList();
|
||||
for (StorageReportProto report : storageReports) {
|
||||
totalCapacity += report.getCapacity();
|
||||
totalRemaining += report.getRemaining();
|
||||
totalScmUsed+= report.getScmUsed();
|
||||
}
|
||||
scmStat.subtract(stat);
|
||||
stat.set(totalCapacity, totalScmUsed, totalRemaining);
|
||||
scmStat.add(stat);
|
||||
}
|
||||
nodeStateManager.setNodeStat(dnId, stat);
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes this stream and releases any system resources associated with it. If
|
||||
* the stream is already closed then invoking this method has no effect.
|
||||
|
@ -275,7 +232,7 @@ public class SCMNodeManager
|
|||
try {
|
||||
nodeStateManager.addNode(datanodeDetails);
|
||||
// Updating Node Report, as registration is successful
|
||||
updateNodeStat(datanodeDetails.getUuid(), nodeReport);
|
||||
processNodeReport(datanodeDetails, nodeReport);
|
||||
LOG.info("Registered Data node : {}", datanodeDetails);
|
||||
} catch (NodeAlreadyExistsException e) {
|
||||
LOG.trace("Datanode is already registered. Datanode: {}",
|
||||
|
@ -321,13 +278,21 @@ public class SCMNodeManager
|
|||
/**
|
||||
* Process node report.
|
||||
*
|
||||
* @param dnUuid
|
||||
* @param datanodeDetails
|
||||
* @param nodeReport
|
||||
*/
|
||||
@Override
|
||||
public void processNodeReport(DatanodeDetails dnUuid,
|
||||
public void processNodeReport(DatanodeDetails datanodeDetails,
|
||||
NodeReportProto nodeReport) {
|
||||
this.updateNodeStat(dnUuid.getUuid(), nodeReport);
|
||||
try {
|
||||
DatanodeInfo datanodeInfo = nodeStateManager.getNode(datanodeDetails);
|
||||
if (nodeReport != null) {
|
||||
datanodeInfo.updateStorageReports(nodeReport.getStorageReportList());
|
||||
}
|
||||
} catch (NodeNotFoundException e) {
|
||||
LOG.warn("Got node report from unregistered datanode {}",
|
||||
datanodeDetails);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -336,7 +301,16 @@ public class SCMNodeManager
|
|||
*/
|
||||
@Override
|
||||
public SCMNodeStat getStats() {
|
||||
return new SCMNodeStat(this.scmStat);
|
||||
long capacity = 0L;
|
||||
long used = 0L;
|
||||
long remaining = 0L;
|
||||
|
||||
for (SCMNodeStat stat : getNodeStats().values()) {
|
||||
capacity += stat.getCapacity().get();
|
||||
used += stat.getScmUsed().get();
|
||||
remaining += stat.getRemaining().get();
|
||||
}
|
||||
return new SCMNodeStat(capacity, used, remaining);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -344,8 +318,24 @@ public class SCMNodeManager
|
|||
* @return a map of individual node stats (live/stale but not dead).
|
||||
*/
|
||||
@Override
|
||||
public Map<UUID, SCMNodeStat> getNodeStats() {
|
||||
return nodeStateManager.getNodeStatsMap();
|
||||
public Map<DatanodeDetails, SCMNodeStat> getNodeStats() {
|
||||
|
||||
final Map<DatanodeDetails, SCMNodeStat> nodeStats = new HashMap<>();
|
||||
|
||||
final List<DatanodeInfo> healthyNodes = nodeStateManager
|
||||
.getNodes(NodeState.HEALTHY);
|
||||
final List<DatanodeInfo> staleNodes = nodeStateManager
|
||||
.getNodes(NodeState.STALE);
|
||||
final List<DatanodeInfo> datanodes = new ArrayList<>(healthyNodes);
|
||||
datanodes.addAll(staleNodes);
|
||||
|
||||
for (DatanodeInfo dnInfo : datanodes) {
|
||||
SCMNodeStat nodeStat = getNodeStatInternal(dnInfo);
|
||||
if (nodeStat != null) {
|
||||
nodeStats.put(dnInfo, nodeStat);
|
||||
}
|
||||
}
|
||||
return nodeStats;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -356,11 +346,28 @@ public class SCMNodeManager
|
|||
*/
|
||||
@Override
|
||||
public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) {
|
||||
final SCMNodeStat nodeStat = getNodeStatInternal(datanodeDetails);
|
||||
return nodeStat != null ? new SCMNodeMetric(nodeStat) : null;
|
||||
}
|
||||
|
||||
private SCMNodeStat getNodeStatInternal(DatanodeDetails datanodeDetails) {
|
||||
try {
|
||||
return new SCMNodeMetric(
|
||||
nodeStateManager.getNodeStat(datanodeDetails.getUuid()));
|
||||
long capacity = 0L;
|
||||
long used = 0L;
|
||||
long remaining = 0L;
|
||||
|
||||
final DatanodeInfo datanodeInfo = nodeStateManager
|
||||
.getNode(datanodeDetails);
|
||||
final List<StorageReportProto> storageReportProtos = datanodeInfo
|
||||
.getStorageReports();
|
||||
for (StorageReportProto reportProto : storageReportProtos) {
|
||||
capacity += reportProto.getCapacity();
|
||||
used += reportProto.getScmUsed();
|
||||
remaining += reportProto.getRemaining();
|
||||
}
|
||||
return new SCMNodeStat(capacity, used, remaining);
|
||||
} catch (NodeNotFoundException e) {
|
||||
LOG.info("SCM getNodeStat from a decommissioned or removed datanode {}",
|
||||
LOG.warn("Cannot generate NodeStat, datanode {} not found.",
|
||||
datanodeDetails.getUuid());
|
||||
return null;
|
||||
}
|
||||
|
@ -375,6 +382,8 @@ public class SCMNodeManager
|
|||
return nodeCountMap;
|
||||
}
|
||||
|
||||
// We should introduce DISK, SSD, etc., notion in
|
||||
// SCMNodeStat and try to use it.
|
||||
@Override
|
||||
public Map<String, Long> getNodeInfo() {
|
||||
long diskCapacity = 0L;
|
||||
|
@ -385,14 +394,15 @@ public class SCMNodeManager
|
|||
long ssdUsed = 0L;
|
||||
long ssdRemaining = 0L;
|
||||
|
||||
List<DatanodeDetails> healthyNodes = getNodes(NodeState.HEALTHY);
|
||||
List<DatanodeDetails> staleNodes = getNodes(NodeState.STALE);
|
||||
List<DatanodeInfo> healthyNodes = nodeStateManager
|
||||
.getNodes(NodeState.HEALTHY);
|
||||
List<DatanodeInfo> staleNodes = nodeStateManager
|
||||
.getNodes(NodeState.STALE);
|
||||
|
||||
List<DatanodeDetails> datanodes = new ArrayList<>(healthyNodes);
|
||||
List<DatanodeInfo> datanodes = new ArrayList<>(healthyNodes);
|
||||
datanodes.addAll(staleNodes);
|
||||
|
||||
for (DatanodeDetails datanodeDetails : datanodes) {
|
||||
DatanodeInfo dnInfo = (DatanodeInfo) datanodeDetails;
|
||||
for (DatanodeInfo dnInfo : datanodes) {
|
||||
List<StorageReportProto> storageReportProtos = dnInfo.getStorageReports();
|
||||
for (StorageReportProto reportProto : storageReportProtos) {
|
||||
if (reportProto.getStorageType() ==
|
||||
|
@ -498,27 +508,6 @@ public class SCMNodeManager
|
|||
commandForDatanode.getCommand());
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the node stats and cluster storage stats in this SCM Node Manager.
|
||||
*
|
||||
* @param dnUuid datanode uuid.
|
||||
*/
|
||||
@Override
|
||||
// TODO: This should be removed.
|
||||
public void processDeadNode(UUID dnUuid) {
|
||||
try {
|
||||
SCMNodeStat stat = nodeStateManager.getNodeStat(dnUuid);
|
||||
if (stat != null) {
|
||||
LOG.trace("Update stat values as Datanode {} is dead.", dnUuid);
|
||||
scmStat.subtract(stat);
|
||||
stat.set(0, 0, 0);
|
||||
}
|
||||
} catch (NodeNotFoundException e) {
|
||||
LOG.warn("Can't update stats based on message of dead Datanode {}, it"
|
||||
+ " doesn't exist or decommissioned already.", dnUuid);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SCMCommand> getCommandQueue(UUID dnID) {
|
||||
return commandQueue.getCommand(dnID);
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hdds.scm.node.states;
|
|||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerID;
|
||||
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
|
||||
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
|
||||
|
||||
import java.util.*;
|
||||
|
@ -45,10 +44,6 @@ public class NodeStateMap {
|
|||
* Represents the current state of node.
|
||||
*/
|
||||
private final ConcurrentHashMap<NodeState, Set<UUID>> stateMap;
|
||||
/**
|
||||
* Represents the current stats of node.
|
||||
*/
|
||||
private final ConcurrentHashMap<UUID, SCMNodeStat> nodeStats;
|
||||
/**
|
||||
* Node to set of containers on the node.
|
||||
*/
|
||||
|
@ -63,7 +58,6 @@ public class NodeStateMap {
|
|||
lock = new ReentrantReadWriteLock();
|
||||
nodeMap = new ConcurrentHashMap<>();
|
||||
stateMap = new ConcurrentHashMap<>();
|
||||
nodeStats = new ConcurrentHashMap<>();
|
||||
nodeToContainer = new ConcurrentHashMap<>();
|
||||
initStateMap();
|
||||
}
|
||||
|
@ -94,7 +88,6 @@ public class NodeStateMap {
|
|||
throw new NodeAlreadyExistsException("Node UUID: " + id);
|
||||
}
|
||||
nodeMap.put(id, new DatanodeInfo(datanodeDetails));
|
||||
nodeStats.put(id, new SCMNodeStat());
|
||||
nodeToContainer.put(id, Collections.emptySet());
|
||||
stateMap.get(nodeState).add(id);
|
||||
} finally {
|
||||
|
@ -126,20 +119,6 @@ public class NodeStateMap {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns DatanodeDetails for the given node id.
|
||||
*
|
||||
* @param uuid Node Id
|
||||
*
|
||||
* @return DatanodeDetails of the node
|
||||
*
|
||||
* @throws NodeNotFoundException if the node is not present
|
||||
*/
|
||||
public DatanodeDetails getNodeDetails(UUID uuid)
|
||||
throws NodeNotFoundException {
|
||||
return getNodeInfo(uuid);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns DatanodeInfo for the given node id.
|
||||
*
|
||||
|
@ -245,43 +224,6 @@ public class NodeStateMap {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current stats of the node.
|
||||
*
|
||||
* @param uuid node id
|
||||
*
|
||||
* @return SCMNodeStat of the specify node.
|
||||
*
|
||||
* @throws NodeNotFoundException if the node is not found
|
||||
*/
|
||||
public SCMNodeStat getNodeStat(UUID uuid) throws NodeNotFoundException {
|
||||
SCMNodeStat stat = nodeStats.get(uuid);
|
||||
if (stat == null) {
|
||||
throw new NodeNotFoundException("Node UUID: " + uuid);
|
||||
}
|
||||
return stat;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a unmodifiable copy of nodeStats.
|
||||
*
|
||||
* @return map with node stats.
|
||||
*/
|
||||
public Map<UUID, SCMNodeStat> getNodeStats() {
|
||||
return Collections.unmodifiableMap(nodeStats);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the current stats of the node.
|
||||
*
|
||||
* @param uuid node id
|
||||
*
|
||||
* @param newstat stat that will set to the specify node.
|
||||
*/
|
||||
public void setNodeStat(UUID uuid, SCMNodeStat newstat) {
|
||||
nodeStats.put(uuid, newstat);
|
||||
}
|
||||
|
||||
public void setContainers(UUID uuid, Set<ContainerID> containers)
|
||||
throws NodeNotFoundException{
|
||||
if (!nodeToContainer.containsKey(uuid)) {
|
||||
|
|
|
@ -76,7 +76,7 @@ public class MockNodeManager implements NodeManager {
|
|||
private final List<DatanodeDetails> healthyNodes;
|
||||
private final List<DatanodeDetails> staleNodes;
|
||||
private final List<DatanodeDetails> deadNodes;
|
||||
private final Map<UUID, SCMNodeStat> nodeMetricMap;
|
||||
private final Map<DatanodeDetails, SCMNodeStat> nodeMetricMap;
|
||||
private final SCMNodeStat aggregateStat;
|
||||
private boolean chillmode;
|
||||
private final Map<UUID, List<SCMCommand>> commandMap;
|
||||
|
@ -114,7 +114,7 @@ public class MockNodeManager implements NodeManager {
|
|||
newStat.set(
|
||||
(NODES[x % NODES.length].capacity),
|
||||
(NODES[x % NODES.length].used), remaining);
|
||||
this.nodeMetricMap.put(datanodeDetails.getUuid(), newStat);
|
||||
this.nodeMetricMap.put(datanodeDetails, newStat);
|
||||
aggregateStat.add(newStat);
|
||||
|
||||
if (NODES[x % NODES.length].getCurrentState() == NodeData.HEALTHY) {
|
||||
|
@ -201,7 +201,7 @@ public class MockNodeManager implements NodeManager {
|
|||
* @return a list of individual node stats (live/stale but not dead).
|
||||
*/
|
||||
@Override
|
||||
public Map<UUID, SCMNodeStat> getNodeStats() {
|
||||
public Map<DatanodeDetails, SCMNodeStat> getNodeStats() {
|
||||
return nodeMetricMap;
|
||||
}
|
||||
|
||||
|
@ -213,7 +213,7 @@ public class MockNodeManager implements NodeManager {
|
|||
*/
|
||||
@Override
|
||||
public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) {
|
||||
SCMNodeStat stat = nodeMetricMap.get(datanodeDetails.getUuid());
|
||||
SCMNodeStat stat = nodeMetricMap.get(datanodeDetails);
|
||||
if (stat == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -413,12 +413,12 @@ public class MockNodeManager implements NodeManager {
|
|||
* @param size number of bytes.
|
||||
*/
|
||||
public void addContainer(DatanodeDetails datanodeDetails, long size) {
|
||||
SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid());
|
||||
SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails);
|
||||
if (stat != null) {
|
||||
aggregateStat.subtract(stat);
|
||||
stat.getCapacity().add(size);
|
||||
aggregateStat.add(stat);
|
||||
nodeMetricMap.put(datanodeDetails.getUuid(), stat);
|
||||
nodeMetricMap.put(datanodeDetails, stat);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -429,12 +429,12 @@ public class MockNodeManager implements NodeManager {
|
|||
* @param size number of bytes.
|
||||
*/
|
||||
public void delContainer(DatanodeDetails datanodeDetails, long size) {
|
||||
SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid());
|
||||
SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails);
|
||||
if (stat != null) {
|
||||
aggregateStat.subtract(stat);
|
||||
stat.getCapacity().subtract(size);
|
||||
aggregateStat.add(stat);
|
||||
nodeMetricMap.put(datanodeDetails.getUuid(), stat);
|
||||
nodeMetricMap.put(datanodeDetails, stat);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -445,21 +445,6 @@ public class MockNodeManager implements NodeManager {
|
|||
commandForDatanode.getCommand());
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the node stats and update the storage stats
|
||||
* in this Node Manager.
|
||||
*
|
||||
* @param dnUuid UUID of the datanode.
|
||||
*/
|
||||
@Override
|
||||
public void processDeadNode(UUID dnUuid) {
|
||||
SCMNodeStat stat = this.nodeMetricMap.get(dnUuid);
|
||||
if (stat != null) {
|
||||
aggregateStat.subtract(stat);
|
||||
stat.set(0, 0, 0);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SCMCommand> getCommandQueue(UUID dnID) {
|
||||
return null;
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.proto
|
|||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
|
||||
import org.apache.hadoop.hdds.scm.HddsTestUtils;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.hdds.scm.TestUtils;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerID;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerManager;
|
||||
|
@ -81,6 +82,10 @@ public class TestDeadNodeHandler {
|
|||
storageDir = GenericTestUtils.getTempPath(
|
||||
TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID());
|
||||
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
|
||||
conf.set(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, "100ms");
|
||||
conf.set(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, "50ms");
|
||||
conf.set(ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL, "1s");
|
||||
conf.set(ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL, "2s");
|
||||
eventQueue = new EventQueue();
|
||||
scm = HddsTestUtils.getScm(conf);
|
||||
nodeManager = (SCMNodeManager) scm.getScmNodeManager();
|
||||
|
@ -237,20 +242,21 @@ public class TestDeadNodeHandler {
|
|||
Assert.assertTrue(nodeStat.get().getRemaining().get() == 90);
|
||||
Assert.assertTrue(nodeStat.get().getScmUsed().get() == 10);
|
||||
|
||||
//WHEN datanode1 is dead.
|
||||
eventQueue.fireEvent(SCMEvents.DEAD_NODE, datanode1);
|
||||
Thread.sleep(100);
|
||||
//TODO: Support logic to mark a node as dead in NodeManager.
|
||||
|
||||
nodeManager.processHeartbeat(datanode2);
|
||||
Thread.sleep(1000);
|
||||
nodeManager.processHeartbeat(datanode2);
|
||||
Thread.sleep(1000);
|
||||
nodeManager.processHeartbeat(datanode2);
|
||||
Thread.sleep(1000);
|
||||
nodeManager.processHeartbeat(datanode2);
|
||||
//THEN statistics in SCM should changed.
|
||||
stat = nodeManager.getStats();
|
||||
Assert.assertTrue(stat.getCapacity().get() == 200);
|
||||
Assert.assertTrue(stat.getRemaining().get() == 180);
|
||||
Assert.assertTrue(stat.getScmUsed().get() == 20);
|
||||
|
||||
nodeStat = nodeManager.getNodeStat(datanode1);
|
||||
Assert.assertTrue(nodeStat.get().getCapacity().get() == 0);
|
||||
Assert.assertTrue(nodeStat.get().getRemaining().get() == 0);
|
||||
Assert.assertTrue(nodeStat.get().getScmUsed().get() == 0);
|
||||
Assert.assertEquals(200L, stat.getCapacity().get().longValue());
|
||||
Assert.assertEquals(180L,
|
||||
stat.getRemaining().get().longValue());
|
||||
Assert.assertEquals(20L, stat.getScmUsed().get().longValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -63,8 +63,7 @@ public class TestNodeReportHandler implements EventPublisher {
|
|||
SCMNodeMetric nodeMetric = nodeManager.getNodeStat(dn);
|
||||
Assert.assertNull(nodeMetric);
|
||||
|
||||
nodeReportHandler.onMessage(
|
||||
getNodeReport(dn, storageOne), this);
|
||||
nodeManager.register(dn, getNodeReport(dn, storageOne).getReport(), null);
|
||||
nodeMetric = nodeManager.getNodeStat(dn);
|
||||
|
||||
Assert.assertTrue(nodeMetric.get().getCapacity().get() == 100);
|
||||
|
|
|
@ -126,7 +126,7 @@ public class ReplicationNodeManagerMock implements NodeManager {
|
|||
* @return a map of individual node stats (live/stale but not dead).
|
||||
*/
|
||||
@Override
|
||||
public Map<UUID, SCMNodeStat> getNodeStats() {
|
||||
public Map<DatanodeDetails, SCMNodeStat> getNodeStats() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -305,15 +305,6 @@ public class ReplicationNodeManagerMock implements NodeManager {
|
|||
// do nothing.
|
||||
}
|
||||
|
||||
/**
|
||||
* Empty implementation for processDeadNode.
|
||||
* @param dnUuid
|
||||
*/
|
||||
@Override
|
||||
public void processDeadNode(UUID dnUuid) {
|
||||
// do nothing.
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<SCMCommand> getCommandQueue(UUID dnID) {
|
||||
return null;
|
||||
|
|
Loading…
Reference in New Issue