HDFS-10897. Ozone: SCM: Add NodeManager. Contributed by Anu Engineer.

This commit is contained in:
Anu Engineer 2016-09-29 10:36:25 -07:00 committed by Owen O'Malley
parent e3aa2b687c
commit 4c95c3d6ab
7 changed files with 1715 additions and 0 deletions

View File

@ -32,6 +32,12 @@
import java.util.Map; import java.util.Map;
import static org.apache.hadoop.ozone.OzoneConfigKeys.*; import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_MS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERVAL_MS;
/** /**
* Utility methods for Ozone and Container Clients. * Utility methods for Ozone and Container Clients.
@ -238,4 +244,138 @@ static Optional<Integer> getPortNumberFromConfigKeys(
services.put(OZONE_SCM_SERVICE_ID, serviceInstances); services.put(OZONE_SCM_SERVICE_ID, serviceInstances);
return services; return services;
} }
/**
* Checks that a given value is with a range.
*
* For example, sanitizeUserArgs(17, 3, 5, 10)
* ensures that 17 is greater/equal than 3 * 5 and less/equal to 3 * 10.
*
* @param valueTocheck - value to check
* @param baseValue - the base value that is being used.
* @param minFactor - range min - a 2 here makes us ensure that value
* valueTocheck is at least twice the baseValue.
* @param maxFactor - range max
* @return long
*/
private static long sanitizeUserArgs(long valueTocheck, long baseValue,
long minFactor, long maxFactor)
throws IllegalArgumentException {
if ((valueTocheck >= (baseValue * minFactor)) &&
(valueTocheck <= (baseValue * maxFactor))) {
return valueTocheck;
}
String errMsg = String.format("%d is not within min = %d or max = " +
"%d", valueTocheck, baseValue * minFactor, baseValue * maxFactor);
throw new IllegalArgumentException(errMsg);
}
/**
* Returns the interval in which the heartbeat processor thread runs.
*
* @param conf - Configuration
* @return long in Milliseconds.
*/
public static long getScmheartbeatCheckerInterval(Configuration conf) {
return conf.getLong(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS,
OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS_DEFAULT);
}
/**
* Heartbeat Interval - Defines the heartbeat frequency from a datanode to
* SCM.
*
* @param conf - Ozone Config
* @return - HB interval in seconds.
*/
public static int getScmHeartbeatInterval(Configuration conf) {
return conf.getInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS,
OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT);
}
/**
* Get the Stale Node interval, which is used by SCM to flag a datanode as
* stale, if the heartbeat from that node has been missing for this duration.
*
* @param conf - Configuration.
* @return - Long, Milliseconds to wait before flagging a node as stale.
*/
public static long getStaleNodeInterval(Configuration conf) {
long staleNodeIntevalMs = conf.getLong(OZONE_SCM_STALENODE_INTERVAL_MS,
OZONE_SCM_STALENODE_INTERVAL_DEFAULT);
long heartbeatThreadFrequencyMs = getScmheartbeatCheckerInterval(conf);
long heartbeatIntervalMs = getScmHeartbeatInterval(conf) * 1000;
// Make sure that StaleNodeInterval is configured way above the frequency
// at which we run the heartbeat thread.
//
// Here we check that staleNodeInterval is at least five times more than the
// frequency at which the accounting thread is going to run.
try {
sanitizeUserArgs(staleNodeIntevalMs, heartbeatThreadFrequencyMs, 5, 1000);
} catch (IllegalArgumentException ex) {
LOG.error("Stale Node Interval MS is cannot be honored due to " +
"mis-configured {}. ex: {}",
OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, ex);
throw ex;
}
// Make sure that stale node value is greater than configured value that
// datanodes are going to send HBs.
try {
sanitizeUserArgs(staleNodeIntevalMs, heartbeatIntervalMs, 3, 1000);
} catch (IllegalArgumentException ex) {
LOG.error("Stale Node Interval MS is cannot be honored due to " +
"mis-configured {}. ex: {}",
OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, ex);
throw ex;
}
return staleNodeIntevalMs;
}
/**
* Gets the interval for dead node flagging. This has to be a value that is
* greater than stale node value, and by transitive relation we also know
* that this value is greater than heartbeat interval and heartbeatProcess
* Interval.
*
* @param conf
* @return
*/
public static long getDeadNodeInterval(Configuration conf) {
long staleNodeIntervalMs = getStaleNodeInterval(conf);
long deadNodeIntervalMs = conf.getLong(
OZONE_SCM_DEADNODE_INTERVAL_MS, OZONE_SCM_DEADNODE_INTERVAL_DEFAULT);
try {
// Make sure that dead nodes Ms is at least twice the time for staleNodes
// with a max of 1000 times the staleNodes.
sanitizeUserArgs(deadNodeIntervalMs, staleNodeIntervalMs, 2, 1000);
} catch (IllegalArgumentException ex) {
LOG.error("Dead Node Interval MS is cannot be honored due to " +
"mis-configured {}. ex: {}",
OZONE_SCM_STALENODE_INTERVAL_MS, ex);
throw ex;
}
return deadNodeIntervalMs;
}
/**
* Returns the maximum number of heartbeat to process per loop of the process
* thread.
* @param conf Configration
* @return - int -- Number of HBs to process
*/
public static int getMaxHBToProcessPerLoop(Configuration conf){
return conf.getInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS,
OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT);
}
} }

View File

@ -70,6 +70,31 @@ public final class OzoneConfigKeys {
"ozone.scm.handler.count.key"; "ozone.scm.handler.count.key";
public static final int OZONE_SCM_HANDLER_COUNT_DEFAULT = 10; public static final int OZONE_SCM_HANDLER_COUNT_DEFAULT = 10;
public static final String OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS =
"ozone.scm.heartbeat.interval.seconds";
public static final int OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT =
30;
public static final String OZONE_SCM_DEADNODE_INTERVAL_MS =
"ozone.scm.dead.node.interval.ms";
public static final long OZONE_SCM_DEADNODE_INTERVAL_DEFAULT =
OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT * 1000L * 20L;
public static final String OZONE_SCM_MAX_HB_COUNT_TO_PROCESS =
"ozone.scm.max.hb.count.to.process";
public static final int OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT = 5000;
public static final String OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS =
"ozone.scm.heartbeat.thread.interval.ms";
public static final long OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS_DEFAULT =
3000;
public static final String OZONE_SCM_STALENODE_INTERVAL_MS =
"ozone.scm.stale.node.interval.ms";
public static final long OZONE_SCM_STALENODE_INTERVAL_DEFAULT =
OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT * 1000L * 3L;
/** /**
* There is no need to instantiate this class. * There is no need to instantiate this class.
*/ */

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.node;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.server.blockmanagement.UnresolvedTopologyException;
import java.io.Closeable;
import java.util.List;
/**
* A node manager supports a simple interface for managing a datanode.
* <p/>
* 1. A datanode registers with the NodeManager.
* <p/>
* 2. If the node is allowed to register, we add that to the nodes that we need
* to keep track of.
* <p/>
* 3. A heartbeat is made by the node at a fixed frequency.
* <p/>
* 4. A node can be in any of these 4 states: {HEALTHY, STALE, DEAD,
* DECOMMISSIONED}
* <p/>
* HEALTHY - It is a datanode that is regularly heartbeating us.
*
* STALE - A datanode for which we have missed few heart beats.
*
* DEAD - A datanode that we have not heard from for a while.
*
* DECOMMISSIONED - Someone told us to remove this node from the tracking
* list, by calling removeNode. We will throw away this nodes info soon.
*/
public interface NodeManager extends Closeable, Runnable {
/**
* Update the heartbeat timestamp.
*
* @param datanodeID - Name of the datanode that send us heatbeat.
*/
void updateHeartbeat(DatanodeID datanodeID);
/**
* Add a New Datanode to the NodeManager.
*
* @param nodeReg - Datanode ID.
* @throws UnresolvedTopologyException
*/
void registerNode(DatanodeID nodeReg)
throws UnresolvedTopologyException;
/**
* Removes a data node from the management of this Node Manager.
*
* @param node - DataNode.
* @throws UnregisteredNodeException
*/
void removeNode(DatanodeID node) throws UnregisteredNodeException;
/**
* Gets all Live Datanodes that is currently communicating with SCM.
*
* @return List of Datanodes that are Heartbeating SCM.
*/
List<DatanodeID> getNodes(NODESTATE nodestate);
/**
* Returns the Number of Datanodes that are communicating with SCM.
*
* @return int -- count
*/
int getNodeCount(NODESTATE nodestate);
/**
* Get all datanodes known to SCM.
*
* @return List of DatanodeIDs known to SCM.
*/
List<DatanodeID> getAllNodes();
/**
* Get the minimum number of nodes to get out of safe mode.
*
* @return int
*/
int getMinimumSafeModeNodes();
/**
* Reports if we have exited out of safe mode by discovering enough nodes.
*
* @return True if we are out of Node layer safe mode, false otherwise.
*/
boolean isOutOfNodeSafeMode();
/**
* Enum that represents the Node State. This is used in calls to getNodeList
* and getNodeCount. TODO: Add decommission when we support it.
*/
enum NODESTATE {
HEALTHY,
STALE,
DEAD
}
}

View File

@ -0,0 +1,513 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.node;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.hadoop.util.Time.monotonicNow;
/**
* Maintains information about the Datanodes on SCM side.
* <p/>
* Heartbeats under SCM is very simple compared to HDFS heartbeatManager.
* <p/>
* Here we maintain 3 maps, and we propagate a node from healthyNodesMap to
* staleNodesMap to deadNodesMap. This moving of a node from one map to
* another is controlled by 4 configuration variables. These variables define
* how many heartbeats must go missing for the node to move from one map to
* another.
* <p/>
* Each heartbeat that SCMNodeManager receives is put into heartbeatQueue. The
* worker thread wakes up and grabs that heartbeat from the queue. The worker
* thread will lookup the healthynodes map and update the timestamp if the entry
* is there. if not it will look up stale and deadnodes map.
* <p/>
*
* TODO: Replace with Node Registration code.
* if the node is not found in any of these tables it is treated as new node for
* time being and added to the healthy nodes list.
*
* <p/>
*
* The getNode(byState) functions make copy of node maps and then creates a
* list based on that. It should be assumed that these get functions always
* report *stale* information. For example, getting the deadNodeCount
* followed by
* getNodes(DEAD) could very well produce totally different count. Also
* getNodeCount(HEALTHY) + getNodeCount(DEAD) + getNodeCode(STALE), is not
* guaranteed to add up to the total nodes that we know off. Please treat all
* get functions in this file as a snap-shot of information that is
* inconsistent as soon as you read it.
*/
public class SCMNodeManager implements NodeManager {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMNodeManager.class);
/**
* Key = NodeID, value = timestamp.
*/
private final Map<String, Long> healthyNodes;
private final Map<String, Long> staleNodes;
private final Map<String, Long> deadNodes;
private final Queue<DatanodeID> heartbeatQueue;
private final Map<String, DatanodeID> nodes;
private final AtomicInteger healthyNodeCount;
private final AtomicInteger staleNodeCount;
private final AtomicInteger deadNodeCount;
private final AtomicInteger totalNodes;
private final long staleNodeIntervalMs;
private final long deadNodeIntervalMs;
private final long heartbeatCheckerIntervalMs;
private final long datanodeHBIntervalSeconds;
private final ScheduledExecutorService executorService;
private long lastHBcheckStart;
private long lastHBcheckFinished = 0;
private long lastHBProcessedCount;
private int safeModeNodeCount;
private final int maxHBToProcessPerLoop;
/**
* Constructs SCM machine Manager.
*/
public SCMNodeManager(Configuration conf) {
heartbeatQueue = new ConcurrentLinkedQueue<>();
healthyNodes = new ConcurrentHashMap<>();
deadNodes = new ConcurrentHashMap<>();
staleNodes = new ConcurrentHashMap<>();
nodes = new HashMap<>();
healthyNodeCount = new AtomicInteger(0);
staleNodeCount = new AtomicInteger(0);
deadNodeCount = new AtomicInteger(0);
totalNodes = new AtomicInteger(0);
// TODO: Support this value as a Percentage of known machines.
safeModeNodeCount = 1;
staleNodeIntervalMs = OzoneClientUtils.getStaleNodeInterval(conf);
deadNodeIntervalMs = OzoneClientUtils.getDeadNodeInterval(conf);
heartbeatCheckerIntervalMs =
OzoneClientUtils.getScmheartbeatCheckerInterval(conf);
datanodeHBIntervalSeconds = OzoneClientUtils.getScmHeartbeatInterval(conf);
maxHBToProcessPerLoop = OzoneClientUtils.getMaxHBToProcessPerLoop(conf);
executorService = HadoopExecutors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("SCM Heartbeat Processing Thread - %d").build());
Preconditions.checkState(heartbeatCheckerIntervalMs > 0);
executorService.schedule(this, heartbeatCheckerIntervalMs,
TimeUnit.MILLISECONDS);
}
/**
* Add a New Datanode to the NodeManager. This function is invoked with
* synchronised(this) being held.
*
* @param nodeReg - node to register
*/
@Override
public void registerNode(DatanodeID nodeReg) {
if (nodes.containsKey(nodeReg.getDatanodeUuid())) {
LOG.error("Datanode is already registered. Datanode: {}",
nodeReg.toString());
return;
}
nodes.put(nodeReg.getDatanodeUuid(), nodeReg);
totalNodes.incrementAndGet();
healthyNodes.put(nodeReg.getDatanodeUuid(), monotonicNow());
healthyNodeCount.incrementAndGet();
LOG.info("Data node with ID: {} Registered.", nodeReg.getDatanodeUuid());
}
/**
* Register the heartbeat with Machine Manager.
*
* This requires no synchronization since the heartbeat queue is
* ConcurrentLinkedQueue. Hence we don't protect it specifically via a lock.
*
* @param datanodeID - Name of the datanode that send us heartbeat.
*/
@Override
public void updateHeartbeat(DatanodeID datanodeID) {
// Checking for NULL to make sure that we don't get
// an exception from ConcurrentList.
// This could be a problem in tests, if this function is invoked via
// protobuf, transport layer will guarantee that this is not null.
if (datanodeID != null) {
heartbeatQueue.add(datanodeID);
return;
}
LOG.error("Datanode ID in heartbeat is null");
}
/**
* Removes a data node from the management of this Node Manager.
*
* @param node - DataNode.
* @throws UnregisteredNodeException
*/
@Override
public void removeNode(DatanodeID node) throws UnregisteredNodeException {
// TODO : Fix me.
}
/**
* Gets all datanodes that are in a certain state. This function works by
* taking a snapshot of the current collection and then returning the list
* from that collection. This means that real map might have changed by the
* time we return this list.
*
* @return List of Datanodes that are known to SCM in the requested state.
*/
@Override
public List<DatanodeID> getNodes(NODESTATE nodestate)
throws IllegalArgumentException{
Map<String, Long> set;
switch (nodestate) {
case HEALTHY:
synchronized (this) {
set = Collections.unmodifiableMap(new HashMap<>(healthyNodes));
}
break;
case STALE:
synchronized (this) {
set = Collections.unmodifiableMap(new HashMap<>(staleNodes));
}
break;
case DEAD:
synchronized (this) {
set = Collections.unmodifiableMap(new HashMap<>(deadNodes));
}
break;
default:
throw new IllegalArgumentException("Unknown node state requested.");
}
return set.entrySet().stream().map(entry -> nodes.get(entry.getKey()))
.collect(Collectors.toList());
}
/**
* Returns all datanodes that are known to SCM.
*
* @return List of DatanodeIDs
*/
@Override
public List<DatanodeID> getAllNodes() {
Map<String, DatanodeID> set;
synchronized (this) {
set = Collections.unmodifiableMap(new HashMap<>(nodes));
}
return set.entrySet().stream().map(entry -> nodes.get(entry.getKey()))
.collect(Collectors.toList());
}
/**
* Get the minimum number of nodes to get out of safe mode.
*
* @return int
*/
@Override
public int getMinimumSafeModeNodes() {
return safeModeNodeCount;
}
/**
* Sets the Minimum SafeModeNode count, used only in testing.
*
* @param count - Number of nodes.
*/
@VisibleForTesting
public void setMinimumSafeModeNodes(int count) {
safeModeNodeCount = count;
}
/**
* Reports if we have exited out of safe mode.
*
* @return true if we are out of safe mode.
*/
@Override
public boolean isOutOfNodeSafeMode() {
LOG.trace("Node count : {}", totalNodes.get());
//TODO : Support a boolean to force getting out of Safe mode.
return (totalNodes.get() >= getMinimumSafeModeNodes());
}
/**
* Returns the Number of Datanodes by State they are in.
*
* @return int -- count
*/
@Override
public int getNodeCount(NODESTATE nodestate) {
switch (nodestate) {
case HEALTHY:
return healthyNodeCount.get();
case STALE:
return staleNodeCount.get();
case DEAD:
return deadNodeCount.get();
default:
throw new IllegalArgumentException("Unknown node state requested.");
}
}
/**
* Used for testing.
* @return true if the HB check is done.
*/
@VisibleForTesting
public boolean waitForHeartbeatThead() {
return lastHBcheckFinished != 0;
}
/**
* This is the real worker thread that processes the HB queue. We do the
* following things in this thread.
*
* Process the Heartbeats that are in the HB Queue.
* Move Stale or Dead node to healthy if we got a heartbeat from them.
* Move Stales Node to dead node table if it is needed.
* Move healthy nodes to stale nodes if it is needed.
*
* if it is a new node, we call register node and add it to the list of nodes.
* This will be replaced when we support registration of a node in SCM.
* @see Thread#run()
*/
@Override
public void run() {
lastHBcheckStart = monotonicNow();
lastHBProcessedCount = 0;
// Process the whole queue.
while (!heartbeatQueue.isEmpty() &&
(lastHBProcessedCount < maxHBToProcessPerLoop)) {
DatanodeID datanodeID = heartbeatQueue.poll();
synchronized (this) {
handleHeartbeat(datanodeID);
}
// we are shutting down or something give up processing the rest of
// HBs. This will terminate the HB processing thread.
if (Thread.currentThread().isInterrupted()) {
LOG.info("Current Thread is isInterrupted, shutting down HB " +
"processing thread for Node Manager.");
return;
}
}
if (lastHBProcessedCount >= maxHBToProcessPerLoop) {
LOG.error("SCM is being flooded by heartbeats. Not able to keep up with" +
" the heartbeat counts. Processed {} heartbeats. Breaking out of" +
" loop. Leaving rest to be processed later. ", lastHBProcessedCount);
}
// Iterate over the Stale nodes and decide if we need to move any node to
// dead State.
long currentTime = monotonicNow();
for (Map.Entry<String, Long> entry : staleNodes.entrySet()) {
if (currentTime - entry.getValue() > deadNodeIntervalMs) {
synchronized (this) {
moveStaleNodeToDead(entry);
}
}
}
// Iterate over the healthy nodes and decide if we need to move any node to
// Stale State.
currentTime = monotonicNow();
for (Map.Entry<String, Long> entry : healthyNodes.entrySet()) {
if (currentTime - entry.getValue() > staleNodeIntervalMs) {
synchronized (this) {
moveHealthyNodeToStale(entry);
}
}
}
lastHBcheckFinished = monotonicNow();
monitorHBProcessingTime();
// we purposefully make this non-deterministic. Instead of using a
// scheduleAtFixedFrequency we will just go to sleep
// and wake up at the next rendezvous point, which is currentTime +
// heartbeatCheckerIntervalMs. This leads to the issue that we are now
// heart beating not at a fixed cadence, but clock tick + time taken to
// work.
//
// This time taken to work can skew the heartbeat processor thread.
// The reason why we don't care is because of the following reasons.
//
// 1. checkerInterval is general many magnitudes faster than datanode HB
// frequency.
//
// 2. if we have too much nodes, the SCM would be doing only HB
// processing, this could lead to SCM's CPU starvation. With this
// approach we always guarantee that HB thread sleeps for a little while.
//
// 3. It is possible that we will never finish processing the HB's in the
// thread. But that means we have a mis-configured system. We will warn
// the users by logging that information.
//
// 4. And the most important reason, heartbeats are not blocked even if
// this thread does not run, they will go into the processing queue.
if (!Thread.currentThread().isInterrupted()) {
executorService.schedule(this, heartbeatCheckerIntervalMs, TimeUnit
.MILLISECONDS);
} else {
LOG.info("Current Thread is interrupted, shutting down HB processing " +
"thread for Node Manager.");
}
}
/**
* If we have taken too much time for HB processing, log that information.
*/
private void monitorHBProcessingTime() {
if (TimeUnit.MILLISECONDS.toSeconds(lastHBcheckFinished -
lastHBcheckStart) > datanodeHBIntervalSeconds) {
LOG.error("Total time spend processing datanode HB's is greater than " +
"configured values for datanode heartbeats. Please adjust the" +
" heartbeat configs. Time Spend on HB processing: {} seconds " +
"Datanode heartbeat Interval: {} seconds , heartbeats " +
"processed: {}",
TimeUnit.MILLISECONDS
.toSeconds(lastHBcheckFinished - lastHBcheckStart),
datanodeHBIntervalSeconds, lastHBProcessedCount);
}
}
/**
* Moves a Healthy node to a Stale node state.
*
* @param entry - Map Entry
*/
private void moveHealthyNodeToStale(Map.Entry<String, Long> entry) {
LOG.trace("Moving healthy node to stale: {}", entry.getKey());
healthyNodes.remove(entry.getKey());
healthyNodeCount.decrementAndGet();
staleNodes.put(entry.getKey(), entry.getValue());
staleNodeCount.incrementAndGet();
}
/**
* Moves a Stale node to a dead node state.
*
* @param entry - Map Entry
*/
private void moveStaleNodeToDead(Map.Entry<String, Long> entry) {
LOG.trace("Moving stale node to dead: {}", entry.getKey());
staleNodes.remove(entry.getKey());
staleNodeCount.decrementAndGet();
deadNodes.put(entry.getKey(), entry.getValue());
deadNodeCount.incrementAndGet();
}
/**
* Handles a single heartbeat from a datanode.
*
* @param datanodeID - datanode ID.
*/
private void handleHeartbeat(DatanodeID datanodeID) {
lastHBProcessedCount++;
// If this node is already in the list of known and healthy nodes
// just update the last timestamp and return.
if (healthyNodes.containsKey(datanodeID.getDatanodeUuid())) {
healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
return;
}
// A stale node has heartbeat us we need to remove the node from stale
// list and move to healthy list.
if (staleNodes.containsKey(datanodeID.getDatanodeUuid())) {
staleNodes.remove(datanodeID.getDatanodeUuid());
healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
healthyNodeCount.incrementAndGet();
staleNodeCount.decrementAndGet();
return;
}
// A dead node has heartbeat us, we need to remove that node from dead
// node list and move it to the healthy list.
if (deadNodes.containsKey(datanodeID.getDatanodeUuid())) {
deadNodes.remove(datanodeID.getDatanodeUuid());
healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
deadNodeCount.decrementAndGet();
healthyNodeCount.incrementAndGet();
return;
}
registerNode(datanodeID);
}
/**
* Closes this stream and releases any system resources associated with it. If
* the stream is already closed then invoking this method has no effect.
*
* @throws IOException if an I/O error occurs
*/
@Override
public void close() throws IOException {
executorService.shutdown();
try {
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.error("Unable to shutdown NodeManager properly.");
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
@VisibleForTesting
long getLastHBProcessedCount() {
return lastHBProcessedCount;
}
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.node;
/**
* The node package deals with node management.
* <p/>
* The node manager takes care of node registrations, removal of node and
* handling of heartbeats.
* <p/>
* The node manager maintains statistics that gets send as part of
* heartbeats.
* <p/>
* The container manager polls the node manager to learn the state of
* datanodes that it is interested in.
* <p/>
*/

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm;
/**
* This package contains Storage Container Manager classes.
*/

View File

@ -0,0 +1,864 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.node;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.test.GenericTestUtils;
import org.hamcrest.CoreMatchers;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_MS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERVAL_MS;
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY;
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.STALE;
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.DEAD;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.StringStartsWith.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Test the Node Manager class.
*/
public class TestNodeManager {
@Rule
public ExpectedException thrown = ExpectedException.none();
@BeforeClass
public static void init() throws IOException {
}
/**
* Returns a new copy of Configuration.
*
* @return Config
*/
Configuration getConf() {
return new OzoneConfiguration();
}
/**
* Create a new datanode ID.
*
* @return DatanodeID
*/
DatanodeID getDatanodeID() {
return getDatanodeID(UUID.randomUUID().toString());
}
/**
* Create a new DatanodeID with NodeID set to the string.
*
* @param uuid - node ID, it is generally UUID.
* @return DatanodeID.
*/
DatanodeID getDatanodeID(String uuid) {
Random random = new Random();
String ipAddress = random.nextInt(256) + "."
+ random.nextInt(256) + "."
+ random.nextInt(256) + "."
+ random.nextInt(256);
String hostName = RandomStringUtils.randomAscii(8);
return new DatanodeID(ipAddress, hostName, uuid,
0, 0, 0, 0);
}
/**
* Creates a NodeManager.
*
* @param config - Config for the node manager.
* @return SCNNodeManager
* @throws IOException
*/
SCMNodeManager createNodeManager(Configuration config) throws IOException {
SCMNodeManager nodeManager = new SCMNodeManager(config);
assertFalse("Node manager should be in safe mode",
nodeManager.isOutOfNodeSafeMode());
return nodeManager;
}
/**
* Tests that Node manager handles heartbeats correctly, and comes out of Safe
* Mode.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmHeartbeat() throws IOException,
InterruptedException, TimeoutException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
// Send some heartbeats from different nodes.
for (int x = 0; x < nodeManager.getMinimumSafeModeNodes(); x++) {
nodeManager.updateHeartbeat(getDatanodeID());
}
// Wait for 4 seconds max.
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000);
assertTrue("Heartbeat thread should have picked up the scheduled " +
"heartbeats and transitioned out of safe mode.",
nodeManager.isOutOfNodeSafeMode());
}
}
/**
* asserts that if we send no heartbeats node manager stays in safemode.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmNoHeartbeats() throws IOException,
InterruptedException, TimeoutException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000);
assertFalse("No heartbeats, Node manager should have been in safe mode.",
nodeManager.isOutOfNodeSafeMode());
}
}
/**
* Asserts that if we don't get enough unique nodes we stay in safemode.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmNotEnoughHeartbeats() throws IOException,
InterruptedException, TimeoutException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
// Need 100 nodes to come out of safe mode, only one node is sending HB.
nodeManager.setMinimumSafeModeNodes(100);
nodeManager.updateHeartbeat(getDatanodeID());
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000);
assertFalse("Not enough heartbeat, Node manager should have been in " +
"safemode.", nodeManager.isOutOfNodeSafeMode());
}
}
/**
* Asserts that many heartbeat from the same node is counted as a single
* node.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmSameNodeHeartbeats() throws IOException,
InterruptedException, TimeoutException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
nodeManager.setMinimumSafeModeNodes(3);
DatanodeID datanodeID = getDatanodeID();
// Send 10 heartbeat from same node, and assert we never leave safe mode.
for (int x = 0; x < 10; x++) {
nodeManager.updateHeartbeat(datanodeID);
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000);
assertFalse("Not enough nodes have send heartbeat to node manager.",
nodeManager.isOutOfNodeSafeMode());
}
}
/**
* Asserts that adding heartbeats after shutdown does not work. This implies
* that heartbeat thread has been shutdown safely by closing the node
* manager.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmShutdown() throws IOException, InterruptedException,
TimeoutException {
Configuration conf = getConf();
conf.setInt(OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100);
SCMNodeManager nodeManager = createNodeManager(conf);
nodeManager.close();
// These should never be processed.
nodeManager.updateHeartbeat(getDatanodeID());
// Let us just wait for 2 seconds to prove that HBs are not processed.
Thread.sleep(2 * 1000);
assertFalse("Node manager executor service is shutdown, should never exit" +
" safe mode", nodeManager.isOutOfNodeSafeMode());
assertEquals("Assert new HBs were never processed", 0,
nodeManager.getLastHBProcessedCount());
}
/**
* Asserts that we detect as many healthy nodes as we have generated heartbeat
* for.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmHealthyNodeCount() throws IOException,
InterruptedException, TimeoutException {
Configuration conf = getConf();
final int count = 10;
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
for (int x = 0; x < count; x++) {
nodeManager.updateHeartbeat(getDatanodeID());
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000);
assertEquals(count, nodeManager.getNodeCount(HEALTHY));
}
}
/**
* Asserts that if user provides a value less than 5 times the heartbeat
* interval as the StaleNode Value, we throw since that is a QoS that we
* cannot maintain.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmSanityOfUserConfig1() throws IOException,
InterruptedException, TimeoutException {
Configuration conf = getConf();
final int interval = 100;
conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval);
conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
// This should be 5 times more than OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS
// and 3 times more than OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS
conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, interval);
thrown.expect(IllegalArgumentException.class);
// This string is a multiple of the interval value
thrown.expectMessage(
startsWith("100 is not within min = 500 or max = 100000"));
createNodeManager(conf);
}
/**
* Asserts that if Stale Interval value is more than 5 times the value of HB
* processing thread it is a sane value.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmSanityOfUserConfig2() throws IOException,
InterruptedException, TimeoutException {
Configuration conf = getConf();
final int interval = 100;
conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval);
conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
// This should be 5 times more than OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS
// and 3 times more than OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS
conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000);
createNodeManager(conf).close();
}
/**
* Asserts that a single node moves from Healthy to stale node if it misses
* the heartbeat.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmDetectStaleNode() throws IOException,
InterruptedException, TimeoutException {
Configuration conf = getConf();
final int interval = 100;
final int nodeCount = 10;
conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval);
conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
// This should be 5 times more than OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS
// and 3 times more than OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS
conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000);
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
List<DatanodeID> nodeList = new LinkedList<>();
DatanodeID staleNode = getDatanodeID();
for (int x = 0; x < nodeCount; x++) {
nodeList.add(getDatanodeID());
}
// Heartbeat once
nodeManager.updateHeartbeat(staleNode);
// Heartbeat all other nodes.
nodeList.forEach(nodeManager::updateHeartbeat);
// Wait for 2 seconds .. and heartbeat good nodes again.
Thread.sleep(2 * 1000);
nodeList.forEach(nodeManager::updateHeartbeat);
// Wait for 2 more seconds, 3 seconds is the stale window for this test
Thread.sleep(2 * 1000);
List<DatanodeID> staleNodeList = nodeManager.getNodes(NodeManager
.NODESTATE.STALE);
assertEquals("Expected to find 1 stale node", 1, nodeManager
.getNodeCount(STALE));
assertEquals("Expected to find 1 stale node", 1, staleNodeList.size());
assertEquals("Stale node is not the expected ID", staleNode
.getDatanodeUuid(), staleNodeList.get(0).getDatanodeUuid());
}
}
/**
* Asserts that a single node moves from Healthy to dead node if it misses
* enough heartbeats.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmDetectDeadNode() throws IOException,
InterruptedException, TimeoutException {
final int interval = 100;
final int nodeCount = 10;
Configuration conf = getConf();
conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval);
conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000);
conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000);
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
List<DatanodeID> nodeList = new LinkedList<>();
DatanodeID deadNode = getDatanodeID();
for (int x = 0; x < nodeCount; x++) {
nodeList.add(getDatanodeID());
}
// Heartbeat once
nodeManager.updateHeartbeat(deadNode);
// Heartbeat all other nodes.
nodeList.forEach(nodeManager::updateHeartbeat);
// Wait for 2 seconds .. and heartbeat good nodes again.
Thread.sleep(2 * 1000);
nodeList.forEach(nodeManager::updateHeartbeat);
Thread.sleep(3 * 1000);
// heartbeat good nodes again.
nodeList.forEach(nodeManager::updateHeartbeat);
// 6 seconds is the dead window for this test , so we wait a total of
// 7 seconds to make sure that the node moves into dead state.
Thread.sleep(2 * 1000);
// Check for the dead node now.
List<DatanodeID> deadNodeList = nodeManager
.getNodes(DEAD);
assertEquals("Expected to find 1 dead node", 1, nodeManager
.getNodeCount(DEAD));
assertEquals("Expected to find 1 dead node", 1, deadNodeList.size());
assertEquals("Dead node is not the expected ID", deadNode
.getDatanodeUuid(), deadNodeList.get(0).getDatanodeUuid());
}
}
/**
* Asserts that if we get duplicate registration calls for a datanode, we will
* ignore it and LOG the error.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmDuplicateRegistrationLogsError() throws IOException,
InterruptedException, TimeoutException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
DatanodeID duplicateNodeID = getDatanodeID();
nodeManager.registerNode(duplicateNodeID);
nodeManager.registerNode(duplicateNodeID);
logCapturer.stopCapturing();
assertThat(logCapturer.getOutput(), containsString("Datanode is already" +
" registered."));
}
}
/**
* Asserts that we log an error for null in datanode ID.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmLogErrorOnNullDatanode() throws IOException,
InterruptedException, TimeoutException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
nodeManager.updateHeartbeat(null);
logCapturer.stopCapturing();
assertThat(logCapturer.getOutput(), containsString("Datanode ID in " +
"heartbeat is null"));
}
}
/**
* Asserts that a dead node, stale node and healthy nodes co-exist. The counts
* , lists and node ID match the expected node state.
* <p/>
* This test is pretty complicated because it explores all states of Node
* manager in a single test. Please read thru the comments to get an idea of
* the current state of the node Manager.
* <p/>
* This test is written like a state machine to avoid threads and concurrency
* issues. This test is replicated below with the use of threads. Avoiding
* threads make it easy to debug the state machine.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmClusterIsInExpectedState1() throws IOException,
InterruptedException, TimeoutException {
DatanodeID healthyNode = getDatanodeID("HealthyNode");
DatanodeID staleNode = getDatanodeID("StaleNode");
DatanodeID deadNode = getDatanodeID("DeadNode");
/**
* These values are very important. Here is what it means so you don't
* have to look it up while reading this code.
*
* OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS - This the frequency of the
* HB processing thread that is running in the SCM. This thread must run
* for the SCM to process the Heartbeats.
*
* OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS - This is the frequency at which
* datanodes will send heartbeats to SCM. Please note: This is the only
* config value for node manager that is specified in seconds. We don't
* want SCM heartbeat resolution to be more than in seconds.
* In this test it is not used, but we are forced to set it because we
* have validation code that checks Stale Node interval and Dead Node
* interval is larger than the value of
* OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS.
*
* OZONE_SCM_STALENODE_INTERVAL_MS - This is the time that must elapse
* from the last heartbeat for us to mark a node as stale. In this test
* we set that to 3. That is if a node has not heartbeat SCM for last 3
* seconds we will mark it as stale.
*
* OZONE_SCM_DEADNODE_INTERVAL_MS - This is the time that must elapse
* from the last heartbeat for a node to be marked dead. We have an
* additional constraint that this must be at least 2 times bigger than
* Stale node Interval.
*
* With these we are trying to explore the state of this cluster with
* various timeouts. Each section is commented so that you can keep
* track of the state of the cluster nodes.
*
*/
Configuration conf = getConf();
conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100);
conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000);
conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000);
/**
* Cluster state: Healthy: All nodes are heartbeat-ing like normal.
*/
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
nodeManager.updateHeartbeat(healthyNode);
nodeManager.updateHeartbeat(staleNode);
nodeManager.updateHeartbeat(deadNode);
// Sleep so that heartbeat processing thread gets to run.
Thread.sleep(500);
//Assert all nodes are healthy.
assertEquals(3, nodeManager.getAllNodes().size());
assertEquals(3, nodeManager.getNodeCount(HEALTHY));
/**
* Cluster state: Quiesced: We are going to sleep for 3 seconds. Which
* means that no node is heartbeating. All nodes should move to Stale.
*/
Thread.sleep(3 * 1000);
assertEquals(3, nodeManager.getAllNodes().size());
assertEquals(3, nodeManager.getNodeCount(STALE));
/**
* Cluster State : Move healthy node back to healthy state, move other 2
* nodes to Stale State.
*
* We heartbeat healthy node after 1 second and let other 2 nodes elapse
* the 3 second windows.
*/
nodeManager.updateHeartbeat(healthyNode);
nodeManager.updateHeartbeat(staleNode);
nodeManager.updateHeartbeat(deadNode);
Thread.sleep(1500);
nodeManager.updateHeartbeat(healthyNode);
Thread.sleep(2 * 1000);
assertEquals(1, nodeManager.getNodeCount(HEALTHY));
// 3.5 seconds from last heartbeat for the stale and deadNode. So those
// 2 nodes must move to Stale state and the healthy node must
// remain in the healthy State.
List<DatanodeID> healthyList = nodeManager.getNodes(HEALTHY);
assertEquals("Expected one healthy node", 1, healthyList.size());
assertEquals("Healthy node is not the expected ID", healthyNode
.getDatanodeUuid(), healthyList.get(0).getDatanodeUuid());
assertEquals(2, nodeManager.getNodeCount(STALE));
/**
* Cluster State: Allow healthyNode to remain in healthy state and
* staleNode to move to stale state and deadNode to move to dead state.
*/
nodeManager.updateHeartbeat(healthyNode);
nodeManager.updateHeartbeat(staleNode);
Thread.sleep(1500);
nodeManager.updateHeartbeat(healthyNode);
Thread.sleep(2 * 1000);
// 3.5 seconds have elapsed for stale node, so it moves into Stale.
// 7 seconds have elapsed for dead node, so it moves into dead.
// 2 Seconds have elapsed for healthy node, so it stays in healhty state.
healthyList = nodeManager.getNodes(HEALTHY);
List<DatanodeID> staleList = nodeManager.getNodes(STALE);
List<DatanodeID> deadList = nodeManager.getNodes(DEAD);
assertEquals(3, nodeManager.getAllNodes().size());
assertEquals(1, nodeManager.getNodeCount(HEALTHY));
assertEquals(1, nodeManager.getNodeCount(STALE));
assertEquals(1, nodeManager.getNodeCount(DEAD));
assertEquals("Expected one healthy node", 1, healthyList.size());
assertEquals("Healthy node is not the expected ID", healthyNode
.getDatanodeUuid(), healthyList.get(0).getDatanodeUuid());
assertEquals("Expected one stale node", 1, staleList.size());
assertEquals("Stale node is not the expected ID", staleNode
.getDatanodeUuid(), staleList.get(0).getDatanodeUuid());
assertEquals("Expected one dead node", 1, deadList.size());
assertEquals("Dead node is not the expected ID", deadNode
.getDatanodeUuid(), deadList.get(0).getDatanodeUuid());
/**
* Cluster State : let us heartbeat all the nodes and verify that we get
* back all the nodes in healthy state.
*/
nodeManager.updateHeartbeat(healthyNode);
nodeManager.updateHeartbeat(staleNode);
nodeManager.updateHeartbeat(deadNode);
Thread.sleep(500);
//Assert all nodes are healthy.
assertEquals(3, nodeManager.getAllNodes().size());
assertEquals(3, nodeManager.getNodeCount(HEALTHY));
}
}
/**
* Heartbeat a given set of nodes at a specified frequency.
*
* @param manager - Node Manager
* @param list - List of datanodeIDs
* @param sleepDuration - Duration to sleep between heartbeats.
* @throws InterruptedException
*/
private void heartbeatNodeSet(NodeManager manager, List<DatanodeID> list,
int sleepDuration) throws InterruptedException {
while (!Thread.currentThread().isInterrupted()) {
list.forEach(manager::updateHeartbeat);
Thread.sleep(sleepDuration);
}
}
/**
* Create a set of Nodes with a given prefix.
*
* @param count - number of nodes.
* @param prefix - A prefix string that can be used in verification.
* @return List of Nodes.
*/
private List<DatanodeID> createNodeSet(int count, String prefix) {
List<DatanodeID> list = new LinkedList<>();
for (int x = 0; x < count; x++) {
list.add(getDatanodeID(prefix + x));
}
return list;
}
/**
* Function that tells us if we found the right number of stale nodes.
*
* @param nodeManager - node manager
* @param count - number of stale nodes to look for.
* @return true if we found the expected number.
*/
private boolean findNodes(NodeManager nodeManager, int count,
NodeManager.NODESTATE state) {
return count == nodeManager.getNodeCount(state);
}
/**
* Asserts that we can create a set of nodes that send its heartbeats from
* different threads and NodeManager behaves as expected.
*
* @throws IOException
* @throws InterruptedException
*/
@Test
public void testScmClusterIsInExpectedState2() throws IOException,
InterruptedException, TimeoutException {
final int healthyCount = 5000;
final int staleCount = 100;
final int deadCount = 10;
Configuration conf = getConf();
conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100);
conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000);
conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000);
conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 7000);
List<DatanodeID> healthyNodeList = createNodeSet(healthyCount, "Healthy");
List<DatanodeID> staleNodeList = createNodeSet(staleCount, "Stale");
List<DatanodeID> deadNodeList = createNodeSet(deadCount, "Dead");
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
Runnable healthyNodeTask = () -> {
try {
// 2 second heartbeat makes these nodes stay healthy.
heartbeatNodeSet(nodeManager, healthyNodeList, 2 * 1000);
} catch (InterruptedException ignored) {
}
};
Runnable staleNodeTask = () -> {
try {
// 4 second heartbeat makes these nodes go to stale and back to
// healthy again.
heartbeatNodeSet(nodeManager, staleNodeList, 4 * 1000);
} catch (InterruptedException ignored) {
}
};
// No Thread just one time HBs the node manager, so that these will be
// marked as dead nodes eventually.
deadNodeList.forEach(nodeManager::updateHeartbeat);
Thread thread1 = new Thread(healthyNodeTask);
thread1.setDaemon(true);
thread1.start();
Thread thread2 = new Thread(staleNodeTask);
thread2.setDaemon(true);
thread2.start();
Thread.sleep(10 * 1000);
// Assert all healthy nodes are healthy now, this has to be a greater
// than check since Stale nodes can be healthy when we check the state.
assertTrue(nodeManager.getNodeCount(HEALTHY) >= healthyCount);
assertEquals(deadCount, nodeManager.getNodeCount(DEAD));
List<DatanodeID> deadList = nodeManager.getNodes(DEAD);
for (DatanodeID node : deadList) {
assertThat(node.getDatanodeUuid(), CoreMatchers.startsWith("Dead"));
}
// Checking stale nodes is tricky since they have to move between
// healthy and stale to avoid becoming dead nodes. So we search for
// that state for a while, if we don't find that state waitfor will
// throw.
GenericTestUtils.waitFor(() -> findNodes(nodeManager, staleCount, STALE),
500, 4 * 1000);
thread1.interrupt();
thread2.interrupt();
}
}
/**
* Asserts that we can handle 6000+ nodes heartbeating SCM.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testScmCanHandleScale() throws IOException,
InterruptedException, TimeoutException {
final int healthyCount = 3000;
final int staleCount = 3000;
List<DatanodeID> healthyList = createNodeSet(healthyCount, "h");
List<DatanodeID> staleList = createNodeSet(staleCount, "s");
Configuration conf = getConf();
conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100);
conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000);
conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000);
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
Runnable healthyNodeTask = () -> {
try {
heartbeatNodeSet(nodeManager, healthyList, 2 * 1000);
} catch (InterruptedException ignored) {
}
};
Runnable staleNodeTask = () -> {
try {
heartbeatNodeSet(nodeManager, staleList, 4 * 1000);
} catch (InterruptedException ignored) {
}
};
Thread thread1 = new Thread(healthyNodeTask);
thread1.setDaemon(true);
thread1.start();
Thread thread2 = new Thread(staleNodeTask);
thread2.setDaemon(true);
thread2.start();
Thread.sleep(3 * 1000);
GenericTestUtils.waitFor(() -> findNodes(nodeManager, staleCount, STALE),
500, 20 * 1000);
assertEquals("Node count mismatch", healthyCount + staleCount, nodeManager
.getAllNodes().size());
thread1.interrupt();
thread2.interrupt();
}
}
/**
* Asserts that SCM backs off from HB processing instead of going into an
* infinite loop if SCM is flooded with too many heartbeats. This many not be
* the best thing to do, but SCM tries to protect itself and logs an error
* saying that it is getting flooded with heartbeats. In real world this can
* lead to many nodes becoming stale or dead due to the fact that SCM is not
* able to keep up with heartbeat processing. This test just verifies that SCM
* will log that information.
*/
@Test
public void testScmLogsHeartbeatFlooding() throws IOException,
InterruptedException {
final int healthyCount = 3000;
List<DatanodeID> healthyList = createNodeSet(healthyCount, "h");
// Make the HB process thread run slower.
Configuration conf = getConf();
conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 500);
conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 500);
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
Runnable healthyNodeTask = () -> {
try {
// No wait in the HB sending loop.
heartbeatNodeSet(nodeManager, healthyList, 0);
} catch (InterruptedException ignored) {
}
};
Thread thread1 = new Thread(healthyNodeTask);
thread1.setDaemon(true);
thread1.start();
Thread.sleep(6 * 1000);
thread1.interrupt();
logCapturer.stopCapturing();
assertThat(logCapturer.getOutput(), containsString("SCM is being " +
"flooded by heartbeats. Not able to keep up with the heartbeat " +
"counts."));
}
}
}