From 71c9b5be7b6c5585498e804f07f6d6d8be317769 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Thu, 29 Sep 2016 10:36:25 -0700 Subject: [PATCH] HDFS-10897. Ozone: SCM: Add NodeManager. Contributed by Anu Engineer. --- .../apache/hadoop/ozone/OzoneClientUtils.java | 140 +++ .../apache/hadoop/ozone/OzoneConfigKeys.java | 25 + .../hadoop/ozone/scm/node/NodeManager.java | 120 +++ .../hadoop/ozone/scm/node/SCMNodeManager.java | 513 +++++++++++ .../hadoop/ozone/scm/node/package-info.java | 31 + .../apache/hadoop/ozone/scm/package-info.java | 22 + .../ozone/scm/node/TestNodeManager.java | 864 ++++++++++++++++++ 7 files changed, 1715 insertions(+) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/package-info.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/package-info.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java index 80a2d33ca2a..549dc804c54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java @@ -32,6 +32,12 @@ import java.util.HashMap; import java.util.Map; import static org.apache.hadoop.ozone.OzoneConfigKeys.*; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_MS; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERVAL_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERVAL_MS; /** * Utility methods for Ozone and Container Clients. @@ -238,4 +244,138 @@ public final class OzoneClientUtils { services.put(OZONE_SCM_SERVICE_ID, serviceInstances); return services; } + + /** + * Checks that a given value is with a range. + * + * For example, sanitizeUserArgs(17, 3, 5, 10) + * ensures that 17 is greater/equal than 3 * 5 and less/equal to 3 * 10. + * + * @param valueTocheck - value to check + * @param baseValue - the base value that is being used. + * @param minFactor - range min - a 2 here makes us ensure that value + * valueTocheck is at least twice the baseValue. + * @param maxFactor - range max + * @return long + */ + private static long sanitizeUserArgs(long valueTocheck, long baseValue, + long minFactor, long maxFactor) + throws IllegalArgumentException { + if ((valueTocheck >= (baseValue * minFactor)) && + (valueTocheck <= (baseValue * maxFactor))) { + return valueTocheck; + } + String errMsg = String.format("%d is not within min = %d or max = " + + "%d", valueTocheck, baseValue * minFactor, baseValue * maxFactor); + throw new IllegalArgumentException(errMsg); + } + + + /** + * Returns the interval in which the heartbeat processor thread runs. + * + * @param conf - Configuration + * @return long in Milliseconds. + */ + public static long getScmheartbeatCheckerInterval(Configuration conf) { + return conf.getLong(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, + OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS_DEFAULT); + } + + + /** + * Heartbeat Interval - Defines the heartbeat frequency from a datanode to + * SCM. + * + * @param conf - Ozone Config + * @return - HB interval in seconds. + */ + public static int getScmHeartbeatInterval(Configuration conf) { + return conf.getInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, + OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT); + } + + + /** + * Get the Stale Node interval, which is used by SCM to flag a datanode as + * stale, if the heartbeat from that node has been missing for this duration. + * + * @param conf - Configuration. + * @return - Long, Milliseconds to wait before flagging a node as stale. + */ + public static long getStaleNodeInterval(Configuration conf) { + + long staleNodeIntevalMs = conf.getLong(OZONE_SCM_STALENODE_INTERVAL_MS, + OZONE_SCM_STALENODE_INTERVAL_DEFAULT); + + long heartbeatThreadFrequencyMs = getScmheartbeatCheckerInterval(conf); + + long heartbeatIntervalMs = getScmHeartbeatInterval(conf) * 1000; + + + // Make sure that StaleNodeInterval is configured way above the frequency + // at which we run the heartbeat thread. + // + // Here we check that staleNodeInterval is at least five times more than the + // frequency at which the accounting thread is going to run. + try { + sanitizeUserArgs(staleNodeIntevalMs, heartbeatThreadFrequencyMs, 5, 1000); + } catch (IllegalArgumentException ex) { + LOG.error("Stale Node Interval MS is cannot be honored due to " + + "mis-configured {}. ex: {}", + OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, ex); + throw ex; + } + + // Make sure that stale node value is greater than configured value that + // datanodes are going to send HBs. + try { + sanitizeUserArgs(staleNodeIntevalMs, heartbeatIntervalMs, 3, 1000); + } catch (IllegalArgumentException ex) { + LOG.error("Stale Node Interval MS is cannot be honored due to " + + "mis-configured {}. ex: {}", + OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, ex); + throw ex; + } + return staleNodeIntevalMs; + } + + + /** + * Gets the interval for dead node flagging. This has to be a value that is + * greater than stale node value, and by transitive relation we also know + * that this value is greater than heartbeat interval and heartbeatProcess + * Interval. + * + * @param conf + * @return + */ + public static long getDeadNodeInterval(Configuration conf) { + long staleNodeIntervalMs = getStaleNodeInterval(conf); + long deadNodeIntervalMs = conf.getLong( + OZONE_SCM_DEADNODE_INTERVAL_MS, OZONE_SCM_DEADNODE_INTERVAL_DEFAULT); + + try { + // Make sure that dead nodes Ms is at least twice the time for staleNodes + // with a max of 1000 times the staleNodes. + sanitizeUserArgs(deadNodeIntervalMs, staleNodeIntervalMs, 2, 1000); + } catch (IllegalArgumentException ex) { + LOG.error("Dead Node Interval MS is cannot be honored due to " + + "mis-configured {}. ex: {}", + OZONE_SCM_STALENODE_INTERVAL_MS, ex); + throw ex; + } + return deadNodeIntervalMs; + } + + /** + * Returns the maximum number of heartbeat to process per loop of the process + * thread. + * @param conf Configration + * @return - int -- Number of HBs to process + */ + public static int getMaxHBToProcessPerLoop(Configuration conf){ + return conf.getInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, + OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 7c95fe7ccc0..ec133aa4b62 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -70,6 +70,31 @@ public final class OzoneConfigKeys { "ozone.scm.handler.count.key"; public static final int OZONE_SCM_HANDLER_COUNT_DEFAULT = 10; + public static final String OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS = + "ozone.scm.heartbeat.interval.seconds"; + public static final int OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT = + 30; + + public static final String OZONE_SCM_DEADNODE_INTERVAL_MS = + "ozone.scm.dead.node.interval.ms"; + public static final long OZONE_SCM_DEADNODE_INTERVAL_DEFAULT = + OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT * 1000L * 20L; + + public static final String OZONE_SCM_MAX_HB_COUNT_TO_PROCESS = + "ozone.scm.max.hb.count.to.process"; + public static final int OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT = 5000; + + public static final String OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS = + "ozone.scm.heartbeat.thread.interval.ms"; + public static final long OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS_DEFAULT = + 3000; + + public static final String OZONE_SCM_STALENODE_INTERVAL_MS = + "ozone.scm.stale.node.interval.ms"; + public static final long OZONE_SCM_STALENODE_INTERVAL_DEFAULT = + OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT * 1000L * 3L; + + /** * There is no need to instantiate this class. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java new file mode 100644 index 00000000000..699c789e5d7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.scm.node; + +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; +import org.apache.hadoop.hdfs.server.blockmanagement.UnresolvedTopologyException; + +import java.io.Closeable; +import java.util.List; + +/** + * A node manager supports a simple interface for managing a datanode. + *

+ * 1. A datanode registers with the NodeManager. + *

+ * 2. If the node is allowed to register, we add that to the nodes that we need + * to keep track of. + *

+ * 3. A heartbeat is made by the node at a fixed frequency. + *

+ * 4. A node can be in any of these 4 states: {HEALTHY, STALE, DEAD, + * DECOMMISSIONED} + *

+ * HEALTHY - It is a datanode that is regularly heartbeating us. + * + * STALE - A datanode for which we have missed few heart beats. + * + * DEAD - A datanode that we have not heard from for a while. + * + * DECOMMISSIONED - Someone told us to remove this node from the tracking + * list, by calling removeNode. We will throw away this nodes info soon. + */ +public interface NodeManager extends Closeable, Runnable { + + /** + * Update the heartbeat timestamp. + * + * @param datanodeID - Name of the datanode that send us heatbeat. + */ + void updateHeartbeat(DatanodeID datanodeID); + + /** + * Add a New Datanode to the NodeManager. + * + * @param nodeReg - Datanode ID. + * @throws UnresolvedTopologyException + */ + void registerNode(DatanodeID nodeReg) + throws UnresolvedTopologyException; + + /** + * Removes a data node from the management of this Node Manager. + * + * @param node - DataNode. + * @throws UnregisteredNodeException + */ + void removeNode(DatanodeID node) throws UnregisteredNodeException; + + /** + * Gets all Live Datanodes that is currently communicating with SCM. + * + * @return List of Datanodes that are Heartbeating SCM. + */ + + List getNodes(NODESTATE nodestate); + + /** + * Returns the Number of Datanodes that are communicating with SCM. + * + * @return int -- count + */ + int getNodeCount(NODESTATE nodestate); + + /** + * Get all datanodes known to SCM. + * + * @return List of DatanodeIDs known to SCM. + */ + List getAllNodes(); + + /** + * Get the minimum number of nodes to get out of safe mode. + * + * @return int + */ + int getMinimumSafeModeNodes(); + + /** + * Reports if we have exited out of safe mode by discovering enough nodes. + * + * @return True if we are out of Node layer safe mode, false otherwise. + */ + boolean isOutOfNodeSafeMode(); + + /** + * Enum that represents the Node State. This is used in calls to getNodeList + * and getNodeCount. TODO: Add decommission when we support it. + */ + enum NODESTATE { + HEALTHY, + STALE, + DEAD + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java new file mode 100644 index 00000000000..e866dbcfc87 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java @@ -0,0 +1,513 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.scm.node; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; +import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.apache.hadoop.util.Time.monotonicNow; + +/** + * Maintains information about the Datanodes on SCM side. + *

+ * Heartbeats under SCM is very simple compared to HDFS heartbeatManager. + *

+ * Here we maintain 3 maps, and we propagate a node from healthyNodesMap to + * staleNodesMap to deadNodesMap. This moving of a node from one map to + * another is controlled by 4 configuration variables. These variables define + * how many heartbeats must go missing for the node to move from one map to + * another. + *

+ * Each heartbeat that SCMNodeManager receives is put into heartbeatQueue. The + * worker thread wakes up and grabs that heartbeat from the queue. The worker + * thread will lookup the healthynodes map and update the timestamp if the entry + * is there. if not it will look up stale and deadnodes map. + *

+ * + * TODO: Replace with Node Registration code. + * if the node is not found in any of these tables it is treated as new node for + * time being and added to the healthy nodes list. + * + *

+ * + * The getNode(byState) functions make copy of node maps and then creates a + * list based on that. It should be assumed that these get functions always + * report *stale* information. For example, getting the deadNodeCount + * followed by + * getNodes(DEAD) could very well produce totally different count. Also + * getNodeCount(HEALTHY) + getNodeCount(DEAD) + getNodeCode(STALE), is not + * guaranteed to add up to the total nodes that we know off. Please treat all + * get functions in this file as a snap-shot of information that is + * inconsistent as soon as you read it. + */ +public class SCMNodeManager implements NodeManager { + + @VisibleForTesting + static final Logger LOG = + LoggerFactory.getLogger(SCMNodeManager.class); + /** + * Key = NodeID, value = timestamp. + */ + private final Map healthyNodes; + private final Map staleNodes; + private final Map deadNodes; + private final Queue heartbeatQueue; + private final Map nodes; + private final AtomicInteger healthyNodeCount; + private final AtomicInteger staleNodeCount; + private final AtomicInteger deadNodeCount; + private final AtomicInteger totalNodes; + private final long staleNodeIntervalMs; + private final long deadNodeIntervalMs; + private final long heartbeatCheckerIntervalMs; + private final long datanodeHBIntervalSeconds; + private final ScheduledExecutorService executorService; + private long lastHBcheckStart; + private long lastHBcheckFinished = 0; + private long lastHBProcessedCount; + private int safeModeNodeCount; + private final int maxHBToProcessPerLoop; + + /** + * Constructs SCM machine Manager. + */ + public SCMNodeManager(Configuration conf) { + heartbeatQueue = new ConcurrentLinkedQueue<>(); + healthyNodes = new ConcurrentHashMap<>(); + deadNodes = new ConcurrentHashMap<>(); + staleNodes = new ConcurrentHashMap<>(); + nodes = new HashMap<>(); + + healthyNodeCount = new AtomicInteger(0); + staleNodeCount = new AtomicInteger(0); + deadNodeCount = new AtomicInteger(0); + totalNodes = new AtomicInteger(0); + + // TODO: Support this value as a Percentage of known machines. + safeModeNodeCount = 1; + + staleNodeIntervalMs = OzoneClientUtils.getStaleNodeInterval(conf); + deadNodeIntervalMs = OzoneClientUtils.getDeadNodeInterval(conf); + heartbeatCheckerIntervalMs = + OzoneClientUtils.getScmheartbeatCheckerInterval(conf); + datanodeHBIntervalSeconds = OzoneClientUtils.getScmHeartbeatInterval(conf); + maxHBToProcessPerLoop = OzoneClientUtils.getMaxHBToProcessPerLoop(conf); + + executorService = HadoopExecutors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("SCM Heartbeat Processing Thread - %d").build()); + + Preconditions.checkState(heartbeatCheckerIntervalMs > 0); + executorService.schedule(this, heartbeatCheckerIntervalMs, + TimeUnit.MILLISECONDS); + } + + /** + * Add a New Datanode to the NodeManager. This function is invoked with + * synchronised(this) being held. + * + * @param nodeReg - node to register + */ + @Override + public void registerNode(DatanodeID nodeReg) { + if (nodes.containsKey(nodeReg.getDatanodeUuid())) { + LOG.error("Datanode is already registered. Datanode: {}", + nodeReg.toString()); + return; + } + nodes.put(nodeReg.getDatanodeUuid(), nodeReg); + totalNodes.incrementAndGet(); + healthyNodes.put(nodeReg.getDatanodeUuid(), monotonicNow()); + healthyNodeCount.incrementAndGet(); + LOG.info("Data node with ID: {} Registered.", nodeReg.getDatanodeUuid()); + } + + /** + * Register the heartbeat with Machine Manager. + * + * This requires no synchronization since the heartbeat queue is + * ConcurrentLinkedQueue. Hence we don't protect it specifically via a lock. + * + * @param datanodeID - Name of the datanode that send us heartbeat. + */ + @Override + public void updateHeartbeat(DatanodeID datanodeID) { + // Checking for NULL to make sure that we don't get + // an exception from ConcurrentList. + // This could be a problem in tests, if this function is invoked via + // protobuf, transport layer will guarantee that this is not null. + if (datanodeID != null) { + heartbeatQueue.add(datanodeID); + return; + } + LOG.error("Datanode ID in heartbeat is null"); + } + + /** + * Removes a data node from the management of this Node Manager. + * + * @param node - DataNode. + * @throws UnregisteredNodeException + */ + @Override + public void removeNode(DatanodeID node) throws UnregisteredNodeException { + // TODO : Fix me. + + } + + /** + * Gets all datanodes that are in a certain state. This function works by + * taking a snapshot of the current collection and then returning the list + * from that collection. This means that real map might have changed by the + * time we return this list. + * + * @return List of Datanodes that are known to SCM in the requested state. + */ + @Override + public List getNodes(NODESTATE nodestate) + throws IllegalArgumentException{ + Map set; + switch (nodestate) { + case HEALTHY: + synchronized (this) { + set = Collections.unmodifiableMap(new HashMap<>(healthyNodes)); + } + break; + case STALE: + synchronized (this) { + set = Collections.unmodifiableMap(new HashMap<>(staleNodes)); + } + break; + case DEAD: + synchronized (this) { + set = Collections.unmodifiableMap(new HashMap<>(deadNodes)); + } + break; + default: + throw new IllegalArgumentException("Unknown node state requested."); + } + + return set.entrySet().stream().map(entry -> nodes.get(entry.getKey())) + .collect(Collectors.toList()); + } + + /** + * Returns all datanodes that are known to SCM. + * + * @return List of DatanodeIDs + */ + @Override + public List getAllNodes() { + Map set; + synchronized (this) { + set = Collections.unmodifiableMap(new HashMap<>(nodes)); + } + return set.entrySet().stream().map(entry -> nodes.get(entry.getKey())) + .collect(Collectors.toList()); + } + + /** + * Get the minimum number of nodes to get out of safe mode. + * + * @return int + */ + @Override + public int getMinimumSafeModeNodes() { + return safeModeNodeCount; + } + + /** + * Sets the Minimum SafeModeNode count, used only in testing. + * + * @param count - Number of nodes. + */ + @VisibleForTesting + public void setMinimumSafeModeNodes(int count) { + safeModeNodeCount = count; + } + + /** + * Reports if we have exited out of safe mode. + * + * @return true if we are out of safe mode. + */ + @Override + public boolean isOutOfNodeSafeMode() { + LOG.trace("Node count : {}", totalNodes.get()); + + //TODO : Support a boolean to force getting out of Safe mode. + return (totalNodes.get() >= getMinimumSafeModeNodes()); + } + + /** + * Returns the Number of Datanodes by State they are in. + * + * @return int -- count + */ + @Override + public int getNodeCount(NODESTATE nodestate) { + switch (nodestate) { + case HEALTHY: + return healthyNodeCount.get(); + case STALE: + return staleNodeCount.get(); + case DEAD: + return deadNodeCount.get(); + default: + throw new IllegalArgumentException("Unknown node state requested."); + } + } + + /** + * Used for testing. + * @return true if the HB check is done. + */ + @VisibleForTesting + public boolean waitForHeartbeatThead() { + return lastHBcheckFinished != 0; + } + + /** + * This is the real worker thread that processes the HB queue. We do the + * following things in this thread. + * + * Process the Heartbeats that are in the HB Queue. + * Move Stale or Dead node to healthy if we got a heartbeat from them. + * Move Stales Node to dead node table if it is needed. + * Move healthy nodes to stale nodes if it is needed. + * + * if it is a new node, we call register node and add it to the list of nodes. + * This will be replaced when we support registration of a node in SCM. + * @see Thread#run() + */ + @Override + public void run() { + lastHBcheckStart = monotonicNow(); + lastHBProcessedCount = 0; + + // Process the whole queue. + while (!heartbeatQueue.isEmpty() && + (lastHBProcessedCount < maxHBToProcessPerLoop)) { + DatanodeID datanodeID = heartbeatQueue.poll(); + synchronized (this) { + handleHeartbeat(datanodeID); + } + // we are shutting down or something give up processing the rest of + // HBs. This will terminate the HB processing thread. + if (Thread.currentThread().isInterrupted()) { + LOG.info("Current Thread is isInterrupted, shutting down HB " + + "processing thread for Node Manager."); + return; + } + } + + if (lastHBProcessedCount >= maxHBToProcessPerLoop) { + LOG.error("SCM is being flooded by heartbeats. Not able to keep up with" + + " the heartbeat counts. Processed {} heartbeats. Breaking out of" + + " loop. Leaving rest to be processed later. ", lastHBProcessedCount); + } + + // Iterate over the Stale nodes and decide if we need to move any node to + // dead State. + long currentTime = monotonicNow(); + for (Map.Entry entry : staleNodes.entrySet()) { + if (currentTime - entry.getValue() > deadNodeIntervalMs) { + synchronized (this) { + moveStaleNodeToDead(entry); + } + } + } + + // Iterate over the healthy nodes and decide if we need to move any node to + // Stale State. + currentTime = monotonicNow(); + for (Map.Entry entry : healthyNodes.entrySet()) { + if (currentTime - entry.getValue() > staleNodeIntervalMs) { + synchronized (this) { + moveHealthyNodeToStale(entry); + } + } + } + lastHBcheckFinished = monotonicNow(); + + monitorHBProcessingTime(); + + // we purposefully make this non-deterministic. Instead of using a + // scheduleAtFixedFrequency we will just go to sleep + // and wake up at the next rendezvous point, which is currentTime + + // heartbeatCheckerIntervalMs. This leads to the issue that we are now + // heart beating not at a fixed cadence, but clock tick + time taken to + // work. + // + // This time taken to work can skew the heartbeat processor thread. + // The reason why we don't care is because of the following reasons. + // + // 1. checkerInterval is general many magnitudes faster than datanode HB + // frequency. + // + // 2. if we have too much nodes, the SCM would be doing only HB + // processing, this could lead to SCM's CPU starvation. With this + // approach we always guarantee that HB thread sleeps for a little while. + // + // 3. It is possible that we will never finish processing the HB's in the + // thread. But that means we have a mis-configured system. We will warn + // the users by logging that information. + // + // 4. And the most important reason, heartbeats are not blocked even if + // this thread does not run, they will go into the processing queue. + + if (!Thread.currentThread().isInterrupted()) { + executorService.schedule(this, heartbeatCheckerIntervalMs, TimeUnit + .MILLISECONDS); + } else { + LOG.info("Current Thread is interrupted, shutting down HB processing " + + "thread for Node Manager."); + } + } + + /** + * If we have taken too much time for HB processing, log that information. + */ + private void monitorHBProcessingTime() { + if (TimeUnit.MILLISECONDS.toSeconds(lastHBcheckFinished - + lastHBcheckStart) > datanodeHBIntervalSeconds) { + LOG.error("Total time spend processing datanode HB's is greater than " + + "configured values for datanode heartbeats. Please adjust the" + + " heartbeat configs. Time Spend on HB processing: {} seconds " + + "Datanode heartbeat Interval: {} seconds , heartbeats " + + "processed: {}", + TimeUnit.MILLISECONDS + .toSeconds(lastHBcheckFinished - lastHBcheckStart), + datanodeHBIntervalSeconds, lastHBProcessedCount); + } + } + + /** + * Moves a Healthy node to a Stale node state. + * + * @param entry - Map Entry + */ + private void moveHealthyNodeToStale(Map.Entry entry) { + LOG.trace("Moving healthy node to stale: {}", entry.getKey()); + healthyNodes.remove(entry.getKey()); + healthyNodeCount.decrementAndGet(); + staleNodes.put(entry.getKey(), entry.getValue()); + staleNodeCount.incrementAndGet(); + } + + /** + * Moves a Stale node to a dead node state. + * + * @param entry - Map Entry + */ + private void moveStaleNodeToDead(Map.Entry entry) { + LOG.trace("Moving stale node to dead: {}", entry.getKey()); + staleNodes.remove(entry.getKey()); + staleNodeCount.decrementAndGet(); + deadNodes.put(entry.getKey(), entry.getValue()); + deadNodeCount.incrementAndGet(); + } + + /** + * Handles a single heartbeat from a datanode. + * + * @param datanodeID - datanode ID. + */ + private void handleHeartbeat(DatanodeID datanodeID) { + lastHBProcessedCount++; + + // If this node is already in the list of known and healthy nodes + // just update the last timestamp and return. + if (healthyNodes.containsKey(datanodeID.getDatanodeUuid())) { + healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow()); + return; + } + + // A stale node has heartbeat us we need to remove the node from stale + // list and move to healthy list. + if (staleNodes.containsKey(datanodeID.getDatanodeUuid())) { + staleNodes.remove(datanodeID.getDatanodeUuid()); + healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow()); + healthyNodeCount.incrementAndGet(); + staleNodeCount.decrementAndGet(); + return; + } + + // A dead node has heartbeat us, we need to remove that node from dead + // node list and move it to the healthy list. + if (deadNodes.containsKey(datanodeID.getDatanodeUuid())) { + deadNodes.remove(datanodeID.getDatanodeUuid()); + healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow()); + deadNodeCount.decrementAndGet(); + healthyNodeCount.incrementAndGet(); + return; + } + + registerNode(datanodeID); + } + + /** + * Closes this stream and releases any system resources associated with it. If + * the stream is already closed then invoking this method has no effect. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + LOG.error("Unable to shutdown NodeManager properly."); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + @VisibleForTesting + long getLastHBProcessedCount() { + return lastHBProcessedCount; + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/package-info.java new file mode 100644 index 00000000000..f1efe799106 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/package-info.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.scm.node; + +/** + * The node package deals with node management. + *

+ * The node manager takes care of node registrations, removal of node and + * handling of heartbeats. + *

+ * The node manager maintains statistics that gets send as part of + * heartbeats. + *

+ * The container manager polls the node manager to learn the state of + * datanodes that it is interested in. + *

+ */ \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/package-info.java new file mode 100644 index 00000000000..08bddb47ea6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.scm; + +/** + * This package contains Storage Container Manager classes. + */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java new file mode 100644 index 00000000000..81fea897391 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java @@ -0,0 +1,864 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.scm.node; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.test.GenericTestUtils; +import org.hamcrest.CoreMatchers; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_MS; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERVAL_MS; +import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY; +import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.STALE; +import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.DEAD; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.StringStartsWith.startsWith; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Test the Node Manager class. + */ +public class TestNodeManager { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @BeforeClass + public static void init() throws IOException { + } + + /** + * Returns a new copy of Configuration. + * + * @return Config + */ + Configuration getConf() { + return new OzoneConfiguration(); + } + + /** + * Create a new datanode ID. + * + * @return DatanodeID + */ + DatanodeID getDatanodeID() { + return getDatanodeID(UUID.randomUUID().toString()); + } + + /** + * Create a new DatanodeID with NodeID set to the string. + * + * @param uuid - node ID, it is generally UUID. + * @return DatanodeID. + */ + DatanodeID getDatanodeID(String uuid) { + Random random = new Random(); + String ipAddress = random.nextInt(256) + "." + + random.nextInt(256) + "." + + random.nextInt(256) + "." + + random.nextInt(256); + + String hostName = RandomStringUtils.randomAscii(8); + return new DatanodeID(ipAddress, hostName, uuid, + 0, 0, 0, 0); + } + + /** + * Creates a NodeManager. + * + * @param config - Config for the node manager. + * @return SCNNodeManager + * @throws IOException + */ + + SCMNodeManager createNodeManager(Configuration config) throws IOException { + SCMNodeManager nodeManager = new SCMNodeManager(config); + assertFalse("Node manager should be in safe mode", + nodeManager.isOutOfNodeSafeMode()); + return nodeManager; + } + + /** + * Tests that Node manager handles heartbeats correctly, and comes out of Safe + * Mode. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmHeartbeat() throws IOException, + InterruptedException, TimeoutException { + + try (SCMNodeManager nodeManager = createNodeManager(getConf())) { + + // Send some heartbeats from different nodes. + for (int x = 0; x < nodeManager.getMinimumSafeModeNodes(); x++) { + nodeManager.updateHeartbeat(getDatanodeID()); + } + + // Wait for 4 seconds max. + GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100, + 4 * 1000); + + assertTrue("Heartbeat thread should have picked up the scheduled " + + "heartbeats and transitioned out of safe mode.", + nodeManager.isOutOfNodeSafeMode()); + } + } + + /** + * asserts that if we send no heartbeats node manager stays in safemode. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmNoHeartbeats() throws IOException, + InterruptedException, TimeoutException { + + try (SCMNodeManager nodeManager = createNodeManager(getConf())) { + GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100, + 4 * 1000); + assertFalse("No heartbeats, Node manager should have been in safe mode.", + nodeManager.isOutOfNodeSafeMode()); + } + } + + /** + * Asserts that if we don't get enough unique nodes we stay in safemode. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmNotEnoughHeartbeats() throws IOException, + InterruptedException, TimeoutException { + try (SCMNodeManager nodeManager = createNodeManager(getConf())) { + + // Need 100 nodes to come out of safe mode, only one node is sending HB. + nodeManager.setMinimumSafeModeNodes(100); + nodeManager.updateHeartbeat(getDatanodeID()); + GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100, + 4 * 1000); + assertFalse("Not enough heartbeat, Node manager should have been in " + + "safemode.", nodeManager.isOutOfNodeSafeMode()); + } + } + + /** + * Asserts that many heartbeat from the same node is counted as a single + * node. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmSameNodeHeartbeats() throws IOException, + InterruptedException, TimeoutException { + + try (SCMNodeManager nodeManager = createNodeManager(getConf())) { + nodeManager.setMinimumSafeModeNodes(3); + DatanodeID datanodeID = getDatanodeID(); + + // Send 10 heartbeat from same node, and assert we never leave safe mode. + for (int x = 0; x < 10; x++) { + nodeManager.updateHeartbeat(datanodeID); + } + + GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100, + 4 * 1000); + assertFalse("Not enough nodes have send heartbeat to node manager.", + nodeManager.isOutOfNodeSafeMode()); + } + } + + /** + * Asserts that adding heartbeats after shutdown does not work. This implies + * that heartbeat thread has been shutdown safely by closing the node + * manager. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmShutdown() throws IOException, InterruptedException, + TimeoutException { + Configuration conf = getConf(); + conf.setInt(OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100); + SCMNodeManager nodeManager = createNodeManager(conf); + nodeManager.close(); + + // These should never be processed. + nodeManager.updateHeartbeat(getDatanodeID()); + + // Let us just wait for 2 seconds to prove that HBs are not processed. + Thread.sleep(2 * 1000); + + assertFalse("Node manager executor service is shutdown, should never exit" + + " safe mode", nodeManager.isOutOfNodeSafeMode()); + + assertEquals("Assert new HBs were never processed", 0, + nodeManager.getLastHBProcessedCount()); + } + + /** + * Asserts that we detect as many healthy nodes as we have generated heartbeat + * for. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmHealthyNodeCount() throws IOException, + InterruptedException, TimeoutException { + Configuration conf = getConf(); + final int count = 10; + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + for (int x = 0; x < count; x++) { + nodeManager.updateHeartbeat(getDatanodeID()); + } + GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100, + 4 * 1000); + assertEquals(count, nodeManager.getNodeCount(HEALTHY)); + } + } + + /** + * Asserts that if user provides a value less than 5 times the heartbeat + * interval as the StaleNode Value, we throw since that is a QoS that we + * cannot maintain. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + + @Test + public void testScmSanityOfUserConfig1() throws IOException, + InterruptedException, TimeoutException { + Configuration conf = getConf(); + final int interval = 100; + conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval); + conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1); + + // This should be 5 times more than OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS + // and 3 times more than OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS + conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, interval); + + thrown.expect(IllegalArgumentException.class); + + // This string is a multiple of the interval value + thrown.expectMessage( + startsWith("100 is not within min = 500 or max = 100000")); + createNodeManager(conf); + } + + /** + * Asserts that if Stale Interval value is more than 5 times the value of HB + * processing thread it is a sane value. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmSanityOfUserConfig2() throws IOException, + InterruptedException, TimeoutException { + Configuration conf = getConf(); + final int interval = 100; + conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval); + conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1); + + // This should be 5 times more than OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS + // and 3 times more than OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS + conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000); + createNodeManager(conf).close(); + } + + /** + * Asserts that a single node moves from Healthy to stale node if it misses + * the heartbeat. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmDetectStaleNode() throws IOException, + InterruptedException, TimeoutException { + Configuration conf = getConf(); + final int interval = 100; + final int nodeCount = 10; + conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval); + conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1); + // This should be 5 times more than OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS + // and 3 times more than OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS + conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000); + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + List nodeList = new LinkedList<>(); + DatanodeID staleNode = getDatanodeID(); + for (int x = 0; x < nodeCount; x++) { + nodeList.add(getDatanodeID()); + } + // Heartbeat once + nodeManager.updateHeartbeat(staleNode); + + // Heartbeat all other nodes. + nodeList.forEach(nodeManager::updateHeartbeat); + + // Wait for 2 seconds .. and heartbeat good nodes again. + Thread.sleep(2 * 1000); + nodeList.forEach(nodeManager::updateHeartbeat); + + // Wait for 2 more seconds, 3 seconds is the stale window for this test + Thread.sleep(2 * 1000); + + List staleNodeList = nodeManager.getNodes(NodeManager + .NODESTATE.STALE); + assertEquals("Expected to find 1 stale node", 1, nodeManager + .getNodeCount(STALE)); + assertEquals("Expected to find 1 stale node", 1, staleNodeList.size()); + assertEquals("Stale node is not the expected ID", staleNode + .getDatanodeUuid(), staleNodeList.get(0).getDatanodeUuid()); + } + } + + /** + * Asserts that a single node moves from Healthy to dead node if it misses + * enough heartbeats. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmDetectDeadNode() throws IOException, + InterruptedException, TimeoutException { + final int interval = 100; + final int nodeCount = 10; + + Configuration conf = getConf(); + conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval); + conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1); + conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000); + conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000); + + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + List nodeList = new LinkedList<>(); + + DatanodeID deadNode = getDatanodeID(); + for (int x = 0; x < nodeCount; x++) { + nodeList.add(getDatanodeID()); + } + // Heartbeat once + nodeManager.updateHeartbeat(deadNode); + + // Heartbeat all other nodes. + nodeList.forEach(nodeManager::updateHeartbeat); + + // Wait for 2 seconds .. and heartbeat good nodes again. + Thread.sleep(2 * 1000); + + nodeList.forEach(nodeManager::updateHeartbeat); + Thread.sleep(3 * 1000); + + // heartbeat good nodes again. + nodeList.forEach(nodeManager::updateHeartbeat); + + // 6 seconds is the dead window for this test , so we wait a total of + // 7 seconds to make sure that the node moves into dead state. + Thread.sleep(2 * 1000); + + // Check for the dead node now. + List deadNodeList = nodeManager + .getNodes(DEAD); + assertEquals("Expected to find 1 dead node", 1, nodeManager + .getNodeCount(DEAD)); + assertEquals("Expected to find 1 dead node", 1, deadNodeList.size()); + assertEquals("Dead node is not the expected ID", deadNode + .getDatanodeUuid(), deadNodeList.get(0).getDatanodeUuid()); + } + } + + /** + * Asserts that if we get duplicate registration calls for a datanode, we will + * ignore it and LOG the error. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmDuplicateRegistrationLogsError() throws IOException, + InterruptedException, TimeoutException { + try (SCMNodeManager nodeManager = createNodeManager(getConf())) { + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG); + DatanodeID duplicateNodeID = getDatanodeID(); + nodeManager.registerNode(duplicateNodeID); + nodeManager.registerNode(duplicateNodeID); + logCapturer.stopCapturing(); + assertThat(logCapturer.getOutput(), containsString("Datanode is already" + + " registered.")); + } + } + + /** + * Asserts that we log an error for null in datanode ID. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmLogErrorOnNullDatanode() throws IOException, + InterruptedException, TimeoutException { + try (SCMNodeManager nodeManager = createNodeManager(getConf())) { + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG); + nodeManager.updateHeartbeat(null); + logCapturer.stopCapturing(); + assertThat(logCapturer.getOutput(), containsString("Datanode ID in " + + "heartbeat is null")); + } + } + + /** + * Asserts that a dead node, stale node and healthy nodes co-exist. The counts + * , lists and node ID match the expected node state. + *

+ * This test is pretty complicated because it explores all states of Node + * manager in a single test. Please read thru the comments to get an idea of + * the current state of the node Manager. + *

+ * This test is written like a state machine to avoid threads and concurrency + * issues. This test is replicated below with the use of threads. Avoiding + * threads make it easy to debug the state machine. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmClusterIsInExpectedState1() throws IOException, + InterruptedException, TimeoutException { + + DatanodeID healthyNode = getDatanodeID("HealthyNode"); + DatanodeID staleNode = getDatanodeID("StaleNode"); + DatanodeID deadNode = getDatanodeID("DeadNode"); + + /** + * These values are very important. Here is what it means so you don't + * have to look it up while reading this code. + * + * OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS - This the frequency of the + * HB processing thread that is running in the SCM. This thread must run + * for the SCM to process the Heartbeats. + * + * OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS - This is the frequency at which + * datanodes will send heartbeats to SCM. Please note: This is the only + * config value for node manager that is specified in seconds. We don't + * want SCM heartbeat resolution to be more than in seconds. + * In this test it is not used, but we are forced to set it because we + * have validation code that checks Stale Node interval and Dead Node + * interval is larger than the value of + * OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS. + * + * OZONE_SCM_STALENODE_INTERVAL_MS - This is the time that must elapse + * from the last heartbeat for us to mark a node as stale. In this test + * we set that to 3. That is if a node has not heartbeat SCM for last 3 + * seconds we will mark it as stale. + * + * OZONE_SCM_DEADNODE_INTERVAL_MS - This is the time that must elapse + * from the last heartbeat for a node to be marked dead. We have an + * additional constraint that this must be at least 2 times bigger than + * Stale node Interval. + * + * With these we are trying to explore the state of this cluster with + * various timeouts. Each section is commented so that you can keep + * track of the state of the cluster nodes. + * + */ + + Configuration conf = getConf(); + conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100); + conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1); + conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000); + conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000); + + + /** + * Cluster state: Healthy: All nodes are heartbeat-ing like normal. + */ + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + nodeManager.updateHeartbeat(healthyNode); + nodeManager.updateHeartbeat(staleNode); + nodeManager.updateHeartbeat(deadNode); + + // Sleep so that heartbeat processing thread gets to run. + Thread.sleep(500); + + //Assert all nodes are healthy. + assertEquals(3, nodeManager.getAllNodes().size()); + assertEquals(3, nodeManager.getNodeCount(HEALTHY)); + + /** + * Cluster state: Quiesced: We are going to sleep for 3 seconds. Which + * means that no node is heartbeating. All nodes should move to Stale. + */ + Thread.sleep(3 * 1000); + assertEquals(3, nodeManager.getAllNodes().size()); + assertEquals(3, nodeManager.getNodeCount(STALE)); + + + /** + * Cluster State : Move healthy node back to healthy state, move other 2 + * nodes to Stale State. + * + * We heartbeat healthy node after 1 second and let other 2 nodes elapse + * the 3 second windows. + */ + + nodeManager.updateHeartbeat(healthyNode); + nodeManager.updateHeartbeat(staleNode); + nodeManager.updateHeartbeat(deadNode); + + Thread.sleep(1500); + nodeManager.updateHeartbeat(healthyNode); + Thread.sleep(2 * 1000); + assertEquals(1, nodeManager.getNodeCount(HEALTHY)); + + + // 3.5 seconds from last heartbeat for the stale and deadNode. So those + // 2 nodes must move to Stale state and the healthy node must + // remain in the healthy State. + List healthyList = nodeManager.getNodes(HEALTHY); + assertEquals("Expected one healthy node", 1, healthyList.size()); + assertEquals("Healthy node is not the expected ID", healthyNode + .getDatanodeUuid(), healthyList.get(0).getDatanodeUuid()); + + assertEquals(2, nodeManager.getNodeCount(STALE)); + + /** + * Cluster State: Allow healthyNode to remain in healthy state and + * staleNode to move to stale state and deadNode to move to dead state. + */ + + nodeManager.updateHeartbeat(healthyNode); + nodeManager.updateHeartbeat(staleNode); + Thread.sleep(1500); + nodeManager.updateHeartbeat(healthyNode); + Thread.sleep(2 * 1000); + + // 3.5 seconds have elapsed for stale node, so it moves into Stale. + // 7 seconds have elapsed for dead node, so it moves into dead. + // 2 Seconds have elapsed for healthy node, so it stays in healhty state. + healthyList = nodeManager.getNodes(HEALTHY); + List staleList = nodeManager.getNodes(STALE); + List deadList = nodeManager.getNodes(DEAD); + + assertEquals(3, nodeManager.getAllNodes().size()); + assertEquals(1, nodeManager.getNodeCount(HEALTHY)); + assertEquals(1, nodeManager.getNodeCount(STALE)); + assertEquals(1, nodeManager.getNodeCount(DEAD)); + + assertEquals("Expected one healthy node", 1, healthyList.size()); + assertEquals("Healthy node is not the expected ID", healthyNode + .getDatanodeUuid(), healthyList.get(0).getDatanodeUuid()); + + assertEquals("Expected one stale node", 1, staleList.size()); + assertEquals("Stale node is not the expected ID", staleNode + .getDatanodeUuid(), staleList.get(0).getDatanodeUuid()); + + assertEquals("Expected one dead node", 1, deadList.size()); + assertEquals("Dead node is not the expected ID", deadNode + .getDatanodeUuid(), deadList.get(0).getDatanodeUuid()); + + /** + * Cluster State : let us heartbeat all the nodes and verify that we get + * back all the nodes in healthy state. + */ + nodeManager.updateHeartbeat(healthyNode); + nodeManager.updateHeartbeat(staleNode); + nodeManager.updateHeartbeat(deadNode); + Thread.sleep(500); + //Assert all nodes are healthy. + assertEquals(3, nodeManager.getAllNodes().size()); + assertEquals(3, nodeManager.getNodeCount(HEALTHY)); + } + } + + /** + * Heartbeat a given set of nodes at a specified frequency. + * + * @param manager - Node Manager + * @param list - List of datanodeIDs + * @param sleepDuration - Duration to sleep between heartbeats. + * @throws InterruptedException + */ + private void heartbeatNodeSet(NodeManager manager, List list, + int sleepDuration) throws InterruptedException { + while (!Thread.currentThread().isInterrupted()) { + list.forEach(manager::updateHeartbeat); + Thread.sleep(sleepDuration); + } + } + + /** + * Create a set of Nodes with a given prefix. + * + * @param count - number of nodes. + * @param prefix - A prefix string that can be used in verification. + * @return List of Nodes. + */ + private List createNodeSet(int count, String prefix) { + List list = new LinkedList<>(); + for (int x = 0; x < count; x++) { + list.add(getDatanodeID(prefix + x)); + } + return list; + } + + /** + * Function that tells us if we found the right number of stale nodes. + * + * @param nodeManager - node manager + * @param count - number of stale nodes to look for. + * @return true if we found the expected number. + */ + private boolean findNodes(NodeManager nodeManager, int count, + NodeManager.NODESTATE state) { + return count == nodeManager.getNodeCount(state); + } + + /** + * Asserts that we can create a set of nodes that send its heartbeats from + * different threads and NodeManager behaves as expected. + * + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testScmClusterIsInExpectedState2() throws IOException, + InterruptedException, TimeoutException { + final int healthyCount = 5000; + final int staleCount = 100; + final int deadCount = 10; + + Configuration conf = getConf(); + conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100); + conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1); + conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000); + conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000); + conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 7000); + + List healthyNodeList = createNodeSet(healthyCount, "Healthy"); + List staleNodeList = createNodeSet(staleCount, "Stale"); + List deadNodeList = createNodeSet(deadCount, "Dead"); + + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + Runnable healthyNodeTask = () -> { + try { + // 2 second heartbeat makes these nodes stay healthy. + heartbeatNodeSet(nodeManager, healthyNodeList, 2 * 1000); + } catch (InterruptedException ignored) { + } + }; + + Runnable staleNodeTask = () -> { + try { + // 4 second heartbeat makes these nodes go to stale and back to + // healthy again. + heartbeatNodeSet(nodeManager, staleNodeList, 4 * 1000); + } catch (InterruptedException ignored) { + } + }; + + + // No Thread just one time HBs the node manager, so that these will be + // marked as dead nodes eventually. + deadNodeList.forEach(nodeManager::updateHeartbeat); + + Thread thread1 = new Thread(healthyNodeTask); + thread1.setDaemon(true); + thread1.start(); + + + Thread thread2 = new Thread(staleNodeTask); + thread2.setDaemon(true); + thread2.start(); + + Thread.sleep(10 * 1000); + + // Assert all healthy nodes are healthy now, this has to be a greater + // than check since Stale nodes can be healthy when we check the state. + + assertTrue(nodeManager.getNodeCount(HEALTHY) >= healthyCount); + + assertEquals(deadCount, nodeManager.getNodeCount(DEAD)); + + List deadList = nodeManager.getNodes(DEAD); + + for (DatanodeID node : deadList) { + assertThat(node.getDatanodeUuid(), CoreMatchers.startsWith("Dead")); + } + + // Checking stale nodes is tricky since they have to move between + // healthy and stale to avoid becoming dead nodes. So we search for + // that state for a while, if we don't find that state waitfor will + // throw. + GenericTestUtils.waitFor(() -> findNodes(nodeManager, staleCount, STALE), + 500, 4 * 1000); + + thread1.interrupt(); + thread2.interrupt(); + } + } + + /** + * Asserts that we can handle 6000+ nodes heartbeating SCM. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmCanHandleScale() throws IOException, + InterruptedException, TimeoutException { + final int healthyCount = 3000; + final int staleCount = 3000; + List healthyList = createNodeSet(healthyCount, "h"); + List staleList = createNodeSet(staleCount, "s"); + Configuration conf = getConf(); + conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100); + conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1); + conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000); + conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000); + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + Runnable healthyNodeTask = () -> { + try { + heartbeatNodeSet(nodeManager, healthyList, 2 * 1000); + } catch (InterruptedException ignored) { + + } + }; + + Runnable staleNodeTask = () -> { + try { + heartbeatNodeSet(nodeManager, staleList, 4 * 1000); + } catch (InterruptedException ignored) { + } + }; + + Thread thread1 = new Thread(healthyNodeTask); + thread1.setDaemon(true); + thread1.start(); + + + Thread thread2 = new Thread(staleNodeTask); + thread2.setDaemon(true); + thread2.start(); + Thread.sleep(3 * 1000); + + GenericTestUtils.waitFor(() -> findNodes(nodeManager, staleCount, STALE), + 500, 20 * 1000); + assertEquals("Node count mismatch", healthyCount + staleCount, nodeManager + .getAllNodes().size()); + + thread1.interrupt(); + thread2.interrupt(); + } + } + + /** + * Asserts that SCM backs off from HB processing instead of going into an + * infinite loop if SCM is flooded with too many heartbeats. This many not be + * the best thing to do, but SCM tries to protect itself and logs an error + * saying that it is getting flooded with heartbeats. In real world this can + * lead to many nodes becoming stale or dead due to the fact that SCM is not + * able to keep up with heartbeat processing. This test just verifies that SCM + * will log that information. + */ + @Test + public void testScmLogsHeartbeatFlooding() throws IOException, + InterruptedException { + final int healthyCount = 3000; + List healthyList = createNodeSet(healthyCount, "h"); + + // Make the HB process thread run slower. + Configuration conf = getConf(); + conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 500); + conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1); + conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 500); + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + GenericTestUtils.LogCapturer logCapturer = + GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG); + Runnable healthyNodeTask = () -> { + try { + // No wait in the HB sending loop. + heartbeatNodeSet(nodeManager, healthyList, 0); + } catch (InterruptedException ignored) { + } + }; + Thread thread1 = new Thread(healthyNodeTask); + thread1.setDaemon(true); + thread1.start(); + + Thread.sleep(6 * 1000); + + + thread1.interrupt(); + logCapturer.stopCapturing(); + + assertThat(logCapturer.getOutput(), containsString("SCM is being " + + "flooded by heartbeats. Not able to keep up with the heartbeat " + + "counts.")); + } + } +}