diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java index cb6a3cd3331..53f37b5a003 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java @@ -23,11 +23,14 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy; +import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementRandom; import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.scm.client.ScmClient; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.utils.LevelDBStore; +import org.iq80.leveldb.Options; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +42,6 @@ import java.util.List; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.iq80.leveldb.Options; /** * Mapping class contains the mapping from a name to a pipeline mapping. This is @@ -94,8 +96,7 @@ public ContainerMapping(final Configuration conf, this.containerSize = OzoneConsts.GB * conf.getInt( ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB, ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT); - - this.placementPolicy = createContainerPlacementPolicy(nodeManager, conf); + this.placementPolicy = createContainerPlacementPolicy(nodeManager, conf); } /** @@ -105,9 +106,10 @@ public ContainerMapping(final Configuration conf, * @param conf - configuration. * @return SCM container placement policy implementation instance. */ + @SuppressWarnings("unchecked") private static ContainerPlacementPolicy createContainerPlacementPolicy( final NodeManager nodeManager, final Configuration conf) { - Class implClass = + Class implClass = (Class) conf.getClass( ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, SCMContainerPlacementRandom.class); @@ -123,15 +125,18 @@ private static ContainerPlacementPolicy createContainerPlacementPolicy( throw new RuntimeException(implClass.getName() + " could not be constructed.", e.getCause()); } catch (Exception e) { + LOG.error("Unhandled exception occured, Placement policy will not be " + + "functional."); + throw new IllegalArgumentException("Unable to load " + + "ContainerPlacementPolicy", e); } - return null; } /** * Translates a list of nodes, ordered such that the first is the leader, into * a corresponding {@link Pipeline} object. * @param nodes - list of datanodes on which we will allocate the container. - * The first of the list will be the leader node. + * The first of the list will be the leader node. * @param containerName container name * @return pipeline corresponding to nodes */ @@ -148,7 +153,6 @@ private static Pipeline newPipelineFromNodes(final List nodes, return pipeline; } - /** * Returns the Pipeline from the container name. * @@ -157,7 +161,7 @@ private static Pipeline newPipelineFromNodes(final List nodes, */ @Override public Pipeline getContainer(final String containerName) throws IOException { - Pipeline pipeline = null; + Pipeline pipeline; lock.lock(); try { byte[] pipelineBytes = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerPlacementPolicy.java index db21bbe908b..0cf1fde372c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerPlacementPolicy.java @@ -15,7 +15,7 @@ * the License. */ -package org.apache.hadoop.ozone.scm.container; +package org.apache.hadoop.ozone.scm.container.placement.algorithms; import org.apache.hadoop.hdfs.protocol.DatanodeID; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/SCMContainerPlacementCapacity.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/SCMContainerPlacementCapacity.java deleted file mode 100644 index 0f1b41e456d..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/SCMContainerPlacementCapacity.java +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.scm.container; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.ozone.scm.node.NodeManager; -import org.apache.hadoop.ozone.scm.node.SCMNodeStat; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Random; -import java.util.Set; -import java.util.stream.Collectors; - -import static java.lang.Math.abs; - -/** - * Container placement policy that randomly choose datanodes with remaining - * space satisfy the size constraints. - */ -public final class SCMContainerPlacementCapacity - implements ContainerPlacementPolicy { - @VisibleForTesting - static final Logger LOG = - LoggerFactory.getLogger(SCMContainerPlacementCapacity.class); - - private static int maxRetry = 100; - private final NodeManager nodeManager; - private final Random rand; - private final Configuration conf; - - public SCMContainerPlacementCapacity(final NodeManager nodeManager, - final Configuration conf) { - this.nodeManager = nodeManager; - this.rand = new Random(); - this.conf = conf; - } - - @Override - public List chooseDatanodes(final int nodesRequired, - final long sizeRequired) throws IOException { - - List healthyNodes = - nodeManager.getNodes(NodeManager.NODESTATE.HEALTHY); - - if (healthyNodes.size() == 0) { - throw new IOException("No healthy node found to allocate container."); - } - - if (healthyNodes.size() < nodesRequired) { - throw new IOException("Not enough nodes to allocate container with " + - nodesRequired + " datanodes required."); - } - - if (healthyNodes.size() == nodesRequired) { - return healthyNodes; - } - - // TODO: add allocation time as metrics - long beginTime = Time.monotonicNow(); - Set results = new HashSet<>(); - for (int i = 0; i < nodesRequired; i++) { - DatanodeID candidate = chooseNode(results, healthyNodes, sizeRequired); - if (candidate != null) { - results.add(candidate); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding datanode {}. Results.size = {} nodesRequired = {}", - candidate, results.size(), nodesRequired); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Can't find datanode. Results.size = {} nodesRequired = {}", - results.size(), nodesRequired); - } - break; - } - } - if (LOG.isTraceEnabled()) { - long endTime = Time.monotonicNow(); - LOG.trace("SCMContainerPlacementCapacity takes {} ms to choose nodes.", - endTime - beginTime); - } - - // TODO: handle under replicated case. - // For now, throw exception only when we can't find any datanode. - if (results.size() == 0) { - throw new IOException("No healthy node found " + - "with enough remaining capacity to allocate container."); - } - - if (results.size() != nodesRequired) { - if (LOG.isDebugEnabled()) { - LOG.debug("SCMContainerPlacementCapacity cannot find enough healthy" + - " datanodes with remaining capacity > {} ." + - "(nodesRequired = {}, nodesFound = {})", sizeRequired, - nodesRequired, results.size()); - } - } - - return results.stream().collect(Collectors.toList()); - } - - /** - * Choose one random node from 2-Random nodes that satisfy the size required. - * @param results - set of current chosen datanodes. - * @param healthyNodes - all healthy datanodes. - * @param sizeRequired - size required for container. - * @return one with larger remaining capacity from two randomly chosen - * datanodes that satisfy sizeRequirement but are not in current - * result set. - */ - private DatanodeID chooseNode(final Set results, - final List healthyNodes, final long sizeRequired) { - NodeAndStat firstNode = chooseOneNode(results, healthyNodes, - sizeRequired); - if (firstNode == null) { - return null; - } - - NodeAndStat secondNode = chooseOneNode(results, healthyNodes, - sizeRequired); - if (secondNode == null) { - return firstNode.getDatanodeID(); - } - - // Pick one with larger remaining space. - return firstNode.getDatanodeStat().getRemaining() > - secondNode.getDatanodeStat().getRemaining() ? - firstNode.getDatanodeID() : secondNode.getDatanodeID(); - } - - /** - * Choose one random node from healthy nodes that satisfies the size - * requirement and has not been chosen in the existing results. - * Retry up to maxRetry(100) times. - * @param results - set of current chosen datanodes. - * @param healthyNodes - all healthy datanodes. - * @param sizeRequired - size required for container. - * @return one with larger remaining capacity from two randomly chosen - * datanodes that satisfy sizeRequirement but are not in current - * result set. - */ - private NodeAndStat chooseOneNode(final Set results, - final List healthyNodes, final long sizeRequired) { - NodeAndStat selectedNode = null; - int retry = 0; - while (selectedNode == null && retry < maxRetry) { - int candidateIdx = abs(rand.nextInt() % healthyNodes.size()); - DatanodeID candidate = healthyNodes.get(candidateIdx); - if (!results.contains(candidate)) { - SCMNodeStat stat = nodeManager.getNodeStat(candidate); - if (stat != null && stat.getRemaining() > sizeRequired) { - selectedNode = new NodeAndStat(candidate, stat); - break; - } - } - retry++; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Find {} after {} retries!", (selectedNode != null) ? - selectedNode.getDatanodeID() : "no datanode", retry); - } - return selectedNode; - } - - /** - * Helper class wraps DatanodeID and SCMNodeStat. - */ - static class NodeAndStat { - private final DatanodeID datanodeID; - private final SCMNodeStat stat; - - NodeAndStat(final DatanodeID id, final SCMNodeStat stat) { - this.datanodeID = id; - this.stat = stat; - } - - public DatanodeID getDatanodeID() { - return datanodeID; - } - - public SCMNodeStat getDatanodeStat() { - return stat; - } - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/SCMContainerPlacementRandom.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/SCMContainerPlacementRandom.java deleted file mode 100644 index cecfcddfc68..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/SCMContainerPlacementRandom.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.scm.container; - - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.ozone.scm.node.NodeManager; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Random; -import java.util.Set; -import java.util.stream.Collectors; - -import static java.lang.Math.abs; - -/** - * Container placement policy that randomly chooses healthy datanodes. - */ -public final class SCMContainerPlacementRandom - implements ContainerPlacementPolicy { - @VisibleForTesting - static final Logger LOG = - LoggerFactory.getLogger(SCMContainerPlacementRandom.class); - - private static int maxRetry = 100; - private final NodeManager nodeManager; - private final Random rand; - private final Configuration conf; - - public SCMContainerPlacementRandom(final NodeManager nodeManager, - final Configuration conf) { - this.nodeManager = nodeManager; - this.rand = new Random(); - this.conf = conf; - } - - @Override - public List chooseDatanodes(final int nodesRequired, - final long sizeRequired) throws IOException { - - List healthyNodes = - nodeManager.getNodes(NodeManager.NODESTATE.HEALTHY); - - if (healthyNodes.size() == 0) { - throw new IOException("No healthy node found to allocate container."); - } - - if (healthyNodes.size() < nodesRequired) { - throw new IOException("Not enough nodes to allocate container with " - + nodesRequired + " datanodes required."); - } - - if (healthyNodes.size() == nodesRequired) { - return healthyNodes; - } - - // TODO: add allocation time as metrics - long beginTime = Time.monotonicNow(); - Set results = new HashSet<>(); - for (int i = 0; i < nodesRequired; i++) { - DatanodeID candidate = chooseNode(results, healthyNodes); - if (candidate != null) { - results.add(candidate); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding datanode {}. Results.size = {} nodesRequired = {}", - candidate, results.size(), nodesRequired); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Can't find datanode. Results.size = {} nodesRequired = {}", - results.size(), nodesRequired); - } - break; - } - } - if (LOG.isTraceEnabled()) { - long endTime = Time.monotonicNow(); - LOG.trace("SCMContainerPlacementRandom takes {} ms to choose nodes.", - endTime - beginTime); - } - - if (results.size() != nodesRequired) { - if (LOG.isDebugEnabled()) { - LOG.debug("SCMContainerPlacementRandom cannot find enough healthy" + - " datanodes. (nodesRequired = {}, nodesFound = {})", - nodesRequired, results.size()); - } - } - return results.stream().collect(Collectors.toList()); - } - - /** - * Choose one random node from 2-Random nodes. Retry up to 100 times until - * find one that has not been chosen in the exising results. - * @param results - set of current chosen datanodes. - * @param healthyNodes - all healthy datanodes. - * @return one randomly chosen datanode that from two randomly chosen datanode - * that are not in current result set. - */ - private DatanodeID chooseNode(final Set results, - final List healthyNodes) { - DatanodeID selectedNode = null; - int retry = 0; - while (selectedNode == null && retry < maxRetry) { - DatanodeID firstNode = healthyNodes.get( - abs(rand.nextInt() % healthyNodes.size())); - DatanodeID secondNode = healthyNodes.get( - abs(rand.nextInt() % healthyNodes.size())); - // Randomly pick one from two candidates. - selectedNode = rand.nextBoolean() ? firstNode : secondNode; - if (results.contains(selectedNode)) { - selectedNode = null; - } else { - break; - } - retry++; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Find {} after {} retries!", (selectedNode != null) ? - selectedNode : "no datanode", retry); - } - return selectedNode; - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMCommonPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMCommonPolicy.java new file mode 100644 index 00000000000..5078111d982 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMCommonPolicy.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.scm.container.placement.algorithms; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.ozone.scm.exceptions.SCMException; +import org.apache.hadoop.ozone.scm.node.NodeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; + +/** + * SCM CommonPolicy implements a set of invariants which are common + * for all container placement policies, acts as the repository of helper + * functions which are common to placement policies. + */ +public abstract class SCMCommonPolicy implements ContainerPlacementPolicy { + @VisibleForTesting + static final Logger LOG = + LoggerFactory.getLogger(SCMCommonPolicy.class); + private final NodeManager nodeManager; + private final Random rand; + private final Configuration conf; + + /** + * Constructs SCM Common Policy Class. + * + * @param nodeManager NodeManager + * @param conf Configuration class. + */ + public SCMCommonPolicy(NodeManager nodeManager, Configuration conf) { + this.nodeManager = nodeManager; + this.rand = new Random(); + this.conf = conf; + } + + /** + * Return node manager. + * + * @return node manager + */ + public NodeManager getNodeManager() { + return nodeManager; + } + + /** + * Returns the Random Object. + * + * @return rand + */ + public Random getRand() { + return rand; + } + + /** + * Get Config. + * + * @return Configuration + */ + public Configuration getConf() { + return conf; + } + + /** + * Given the replication factor and size required, return set of datanodes + * that satisfy the nodes and size requirement. + *

+ * Here are some invariants of container placement. + *

+ * 1. We place containers only on healthy nodes. + * 2. We place containers on nodes with enough space for that container. + * 3. if a set of containers are requested, we either meet the required + * number of nodes or we fail that request. + * + * @param nodesRequired - number of datanodes required. + * @param sizeRequired - size required for the container or block. + * @return list of datanodes chosen. + * @throws SCMException SCM exception. + */ + + public List chooseDatanodes(int nodesRequired, final long + sizeRequired) throws SCMException { + List healthyNodes = + nodeManager.getNodes(NodeManager.NODESTATE.HEALTHY); + String msg; + if (healthyNodes.size() == 0) { + msg = "No healthy node found to allocate container."; + LOG.error(msg); + throw new SCMException(msg, SCMException.ResultCodes + .FAILED_TO_FIND_HEALTHY_NODES); + } + + if (healthyNodes.size() < nodesRequired) { + msg = String.format("Not enough healthy nodes to allocate container. %d " + + " datanodes required. Found %d", + nodesRequired, healthyNodes.size()); + LOG.error(msg); + throw new SCMException(msg, + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + List healthyList = healthyNodes.stream().filter(d -> + hasEnoughSpace(d, sizeRequired)).collect(Collectors.toList()); + + if (healthyList.size() < nodesRequired) { + msg = String.format("Unable to find enough nodes that meet the space " + + "requirement in healthy node set. Nodes required: %d Found: %d", + nodesRequired, healthyList.size()); + LOG.error(msg); + throw new SCMException(msg, + SCMException.ResultCodes.FAILED_TO_FIND_NODES_WITH_SPACE); + } + + return healthyList; + } + + /** + * Returns true if this node has enough space to meet our requirement. + * + * @param datanodeID DatanodeID + * @return true if we have enough space. + */ + private boolean hasEnoughSpace(DatanodeID datanodeID, long sizeRequired) { + SCMNodeMetric nodeMetric = nodeManager.getNodeStat(datanodeID); + return (nodeMetric != null) && nodeMetric.get().getRemaining() + .hasResources(sizeRequired); + } + + /** + * This function invokes the derived classes chooseNode Function to build a + * list of nodes. Then it verifies that invoked policy was able to return + * expected number of nodes. + * + * @param nodesRequired - Nodes Required + * @param healthyNodes - List of Nodes in the result set. + * @return List of Datanodes that can be used for placement. + * @throws SCMException + */ + public List getResultSet(int nodesRequired, List + healthyNodes) throws SCMException { + List results = new LinkedList<>(); + for (int x = 0; x < nodesRequired; x++) { + // invoke the choose function defined in the derived classes. + DatanodeID nodeId = chooseNode(healthyNodes); + if (nodeId != null) { + results.add(nodeId); + } + } + + if (results.size() < nodesRequired) { + LOG.error("Unable to find the required number of healthy nodes that " + + "meet the criteria. Required nodes: {}, Found nodes: {}", + nodesRequired, results.size()); + throw new SCMException("Unable to find required number of nodes.", + SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE); + } + return results; + } + + /** + * Choose a datanode according to the policy, this function is implemented + * by the actual policy class. For example, PlacementCapacity or + * PlacementRandom. + * + * @param healthyNodes - Set of healthy nodes we can choose from. + * @return DatanodeID + */ + public abstract DatanodeID chooseNode(List healthyNodes); + + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java new file mode 100644 index 00000000000..da85eeee92e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.scm.container.placement.algorithms; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.ozone.scm.exceptions.SCMException; +import org.apache.hadoop.ozone.scm.node.NodeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Container placement policy that randomly choose datanodes with remaining + * space to satisfy the size constraints. + *

+ * The Algorithm is as follows, Pick 2 random nodes from a given pool of nodes + * and then pick the node which lower utilization. This leads to a higher + * probability of nodes with lower utilization to be picked. + *

+ * For those wondering why we choose two nodes randomly and choose the node + * with lower utilization. There are links to this original papers in + * HDFS-11564. + *

+ * A brief summary -- We treat the nodes from a scale of lowest utilized to + * highest utilized, there are (s * ( s + 1)) / 2 possibilities to build + * distinct pairs of nodes. There are s - k pairs of nodes in which the rank + * k node is less than the couple. So probability of a picking a node is + * (2 * (s -k)) / (s * (s - 1)). + *

+ * In English, There is a much higher probability of picking less utilized nodes + * as compared to nodes with higher utilization since we pick 2 nodes and + * then pick the node with lower utilization. + *

+ * This avoids the issue of users adding new nodes into the cluster and HDFS + * sending all traffic to those nodes if we only use a capacity based + * allocation scheme. Unless those nodes are part of the set of the first 2 + * nodes then newer nodes will not be in the running to get the container. + *

+ * This leads to an I/O pattern where the lower utilized nodes are favoured + * more than higher utilized nodes, but part of the I/O will still go to the + * older higher utilized nodes. + *

+ * With this algorithm in place, our hope is that balancer tool needs to do + * little or no work and the cluster will achieve a balanced distribution + * over time. + */ +public final class SCMContainerPlacementCapacity extends SCMCommonPolicy { + @VisibleForTesting + static final Logger LOG = + LoggerFactory.getLogger(SCMContainerPlacementCapacity.class); + + /** + * Constructs a Container Placement with considering only capacity. + * That is this policy tries to place containers based on node weight. + * + * @param nodeManager Node Manager + * @param conf Configuration + */ + public SCMContainerPlacementCapacity(final NodeManager nodeManager, + final Configuration conf) { + super(nodeManager, conf); + } + + /** + * Called by SCM to choose datanodes. + * + * @param nodesRequired - number of datanodes required. + * @param sizeRequired - size required for the container or block. + * @return List of datanodes. + * @throws SCMException SCMException + */ + @Override + public List chooseDatanodes(final int nodesRequired, + final long sizeRequired) throws SCMException { + List healthyNodes = + super.chooseDatanodes(nodesRequired, sizeRequired); + if (healthyNodes.size() == nodesRequired) { + return healthyNodes; + } + return getResultSet(nodesRequired, healthyNodes); + } + + /** + * Find a node from the healthy list and return it after removing it from the + * list that we are operating on. + * + * @param healthyNodes - List of healthy nodes that meet the size + * requirement. + * @return DatanodeID that is chosen. + */ + @Override + public DatanodeID chooseNode(List healthyNodes) { + int firstNodeNdx = getRand().nextInt(healthyNodes.size()); + int secondNodeNdx = getRand().nextInt(healthyNodes.size()); + + // There is a possibility that both numbers will be same. + // if that is so, we just return the node. + if (firstNodeNdx == secondNodeNdx) { + return healthyNodes.get(firstNodeNdx); + } + + DatanodeID firstNodeID = healthyNodes.get(firstNodeNdx); + DatanodeID secondNodeID = healthyNodes.get(secondNodeNdx); + SCMNodeMetric firstNodeMetric = getNodeManager().getNodeStat(firstNodeID); + SCMNodeMetric secondNodeMetric = getNodeManager().getNodeStat(secondNodeID); + + DatanodeID chosenID = firstNodeMetric.isGreater(secondNodeMetric.get()) + ? firstNodeID : secondNodeID; + + healthyNodes.remove(chosenID); + return chosenID; + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMContainerPlacementRandom.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMContainerPlacementRandom.java new file mode 100644 index 00000000000..b145b14a605 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/SCMContainerPlacementRandom.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.scm.container.placement.algorithms; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.scm.exceptions.SCMException; +import org.apache.hadoop.ozone.scm.node.NodeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Container placement policy that randomly chooses healthy datanodes. + * This is very similar to current HDFS placement. That is we + * just randomly place containers without any considerations of utilization. + *

+ * That means we rely on balancer to achieve even distribution of data. + * Balancer will need to support containers as a feature before this class + * can be practically used. + */ +public final class SCMContainerPlacementRandom extends SCMCommonPolicy + implements ContainerPlacementPolicy { + @VisibleForTesting + static final Logger LOG = + LoggerFactory.getLogger(SCMContainerPlacementRandom.class); + + /** + * Construct a random Block Placement policy. + * + * @param nodeManager nodeManager + * @param conf Config + */ + public SCMContainerPlacementRandom(final NodeManager nodeManager, + final Configuration conf) { + super(nodeManager, conf); + } + + /** + * Choose datanodes called by the SCM to choose the datanode. + * + * @param nodesRequired - number of datanodes required. + * @param sizeRequired - size required for the container or block. + * @return List of Datanodes. + * @throws SCMException SCMException + */ + @Override + public List chooseDatanodes(final int nodesRequired, + final long sizeRequired) throws SCMException { + List healthyNodes = + super.chooseDatanodes(nodesRequired, sizeRequired); + + if (healthyNodes.size() == nodesRequired) { + return healthyNodes; + } + return getResultSet(nodesRequired, healthyNodes); + } + + /** + * Just chose a node randomly and remove it from the set of nodes we can + * chose from. + * + * @param healthyNodes - all healthy datanodes. + * @return one randomly chosen datanode that from two randomly chosen datanode + */ + public DatanodeID chooseNode(final List healthyNodes) { + DatanodeID selectedNode = + healthyNodes.get(getRand().nextInt(healthyNodes.size())); + healthyNodes.remove(selectedNode); + return selectedNode; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/package-info.java new file mode 100644 index 00000000000..d6280df4309 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/algorithms/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.scm.container.placement.algorithms; +// Various placement algorithms. \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/DatanodeMetric.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/DatanodeMetric.java new file mode 100644 index 00000000000..cc829c2c3f0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/DatanodeMetric.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.scm.container.placement.metrics; + +import org.apache.hadoop.ozone.scm.exceptions.SCMException; + +/** + * DatanodeMetric acts as the basis for all the metric that is used in + * comparing 2 datanodes. + */ +public interface DatanodeMetric extends Comparable { + + /** + * Some syntactic sugar over Comparable interface. This makes code easier to + * read. + * + * @param o - Other Object + * @return - True if *this* object is greater than argument. + */ + boolean isGreater(T o); + + /** + * Inverse of isGreater. + * + * @param o - other object. + * @return True if *this* object is Lesser than argument. + */ + boolean isLess(T o); + + /** + * Returns true if the object has same values. Because of issues with + * equals, and loss of type information this interface supports isEqual. + * + * @param o object to compare. + * @return True, if the values match. + */ + boolean isEqual(T o); + + /** + * A resourceCheck, defined by resourceNeeded. + * For example, S could be bytes required + * and DatanodeMetric can reply by saying it can be met or not. + * + * @param resourceNeeded - ResourceNeeded in its own metric. + * @return boolean, True if this resource requirement can be met. + */ + boolean hasResources(S resourceNeeded) throws SCMException; + + /** + * Returns the metric. + * + * @return T, the object that represents this metric. + */ + T get(); + + /** + * Sets the value of this metric. + * + * @param value - value of the metric. + */ + void set(T value); + + /** + * Adds a value of to the base. + * @param value - value + */ + void add(T value); + + /** + * subtract a value. + * @param value value + */ + void subtract(T value); + + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/LongMetric.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/LongMetric.java new file mode 100644 index 00000000000..5fc22b10eae --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/LongMetric.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.scm.container.placement.metrics; + +/** + * An helper class for all metrics based on Longs. + */ +public class LongMetric implements DatanodeMetric { + private Long value; + + /** + * Constructs a long Metric. + * + * @param value Value for this metric. + */ + public LongMetric(Long value) { + this.value = value; + } + + /** + * Some syntactic sugar over Comparable interface. This makes code easier to + * read. + * + * @param o - Other Object + * @return - True if *this* object is greater than argument. + */ + @Override + public boolean isGreater(Long o) { + return compareTo(o) > 0; + } + + /** + * Inverse of isGreater. + * + * @param o - other object. + * @return True if *this* object is Lesser than argument. + */ + @Override + public boolean isLess(Long o) { + return compareTo(o) < 0; + } + + /** + * Returns true if the object has same values. Because of issues with + * equals, and loss of type information this interface supports isEqual. + * + * @param o object to compare. + * @return True, if the values match. + */ + @Override + public boolean isEqual(Long o) { + return compareTo(o) == 0; + } + + /** + * A resourceCheck, defined by resourceNeeded. + * For example, S could be bytes required + * and DatanodeMetric can reply by saying it can be met or not. + * + * @param resourceNeeded - ResourceNeeded in its own metric. + * @return boolean, True if this resource requirement can be met. + */ + @Override + public boolean hasResources(Long resourceNeeded) { + return isGreater(resourceNeeded); + } + + /** + * Returns the metric. + * + * @return T, the object that represents this metric. + */ + @Override + public Long get() { + return this.value; + } + + /** + * Sets the value of this metric. + * + * @param value - value of the metric. + */ + @Override + public void set(Long value) { + this.value = value; + + } + + /** + * Adds a value of to the base. + * + * @param value - value + */ + @Override + public void add(Long value) { + this.value += value; + } + + /** + * subtract a value. + * + * @param value value + */ + @Override + public void subtract(Long value) { + this.value -= value; + } + + /** + * Compares this object with the specified object for order. Returns a + * negative integer, zero, or a positive integer as this object is less + * than, equal to, or greater than the specified object. + * + * @param o the object to be compared. + * @return a negative integer, zero, or a positive integer as this object is + * less than, equal to, or greater than the specified object. + * @throws NullPointerException if the specified object is null + * @throws ClassCastException if the specified object's type prevents it + * from being compared to this object. + */ + @Override + public int compareTo(Long o) { + return Long.compare(this.value, o); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + LongMetric that = (LongMetric) o; + + return value != null ? value.equals(that.value) : that.value == null; + } + + @Override + public int hashCode() { + return value != null ? value.hashCode() : 0; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/SCMNodeMetric.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/SCMNodeMetric.java new file mode 100644 index 00000000000..ae01361663c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/SCMNodeMetric.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.scm.container.placement.metrics; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * SCM Node Metric that is used in the placement classes. + */ +public class SCMNodeMetric implements DatanodeMetric { + private SCMNodeStat stat; + + /** + * Constructs an SCMNode Metric. + * + * @param stat - SCMNodeStat. + */ + public SCMNodeMetric(SCMNodeStat stat) { + this.stat = stat; + } + + /** + * Set the capacity, used and remaining space on a datanode. + * + * @param capacity in bytes + * @param used in bytes + * @param remaining in bytes + */ + @VisibleForTesting + public SCMNodeMetric(long capacity, long used, long remaining) { + this.stat = new SCMNodeStat(); + this.stat.set(capacity, used, remaining); + } + + /** + * + * @param o - Other Object + * @return - True if *this* object is greater than argument. + */ + @Override + public boolean isGreater(SCMNodeStat o) { + Preconditions.checkNotNull(o, "Argument cannot be null"); + + // if zero, replace with 1 for the division to work. + long thisDenominator = (this.stat.getCapacity().get() == 0) + ? 1 : this.stat.getCapacity().get(); + long otherDenominator = (o.getCapacity().get() == 0) + ? 1 : o.getCapacity().get(); + + float thisNodeWeight = + stat.getScmUsed().get() / (float) thisDenominator; + + float oNodeWeight = + o.getScmUsed().get() / (float) otherDenominator; + + if (Math.abs(thisNodeWeight - oNodeWeight) > 0.000001) { + return thisNodeWeight > oNodeWeight; + } + // if these nodes are have similar weight then return the node with more + // free space as the greater node. + return stat.getRemaining().isGreater(o.getRemaining().get()); + } + + /** + * Inverse of isGreater. + * + * @param o - other object. + * @return True if *this* object is Lesser than argument. + */ + @Override + public boolean isLess(SCMNodeStat o) { + Preconditions.checkNotNull(o, "Argument cannot be null"); + + // if zero, replace with 1 for the division to work. + long thisDenominator = (this.stat.getCapacity().get() == 0) + ? 1 : this.stat.getCapacity().get(); + long otherDenominator = (o.getCapacity().get() == 0) + ? 1 : o.getCapacity().get(); + + float thisNodeWeight = + stat.getScmUsed().get() / (float) thisDenominator; + + float oNodeWeight = + o.getScmUsed().get() / (float) otherDenominator; + + if (Math.abs(thisNodeWeight - oNodeWeight) > 0.000001) { + return thisNodeWeight < oNodeWeight; + } + + // if these nodes are have similar weight then return the node with less + // free space as the lesser node. + return stat.getRemaining().isLess(o.getRemaining().get()); + } + + /** + * Returns true if the object has same values. Because of issues with + * equals, and loss of type information this interface supports isEqual. + * + * @param o object to compare. + * @return True, if the values match. + * TODO : Consider if it makes sense to add remaining to this equation. + */ + @Override + public boolean isEqual(SCMNodeStat o) { + float thisNodeWeight = stat.getScmUsed().get() / (float) + stat.getCapacity().get(); + float oNodeWeight = o.getScmUsed().get() / (float) o.getCapacity().get(); + return Math.abs(thisNodeWeight - oNodeWeight) < 0.000001; + } + + /** + * A resourceCheck, defined by resourceNeeded. + * For example, S could be bytes required + * and DatanodeMetric can reply by saying it can be met or not. + * + * @param resourceNeeded - ResourceNeeded in its own metric. + * @return boolean, True if this resource requirement can be met. + */ + @Override + public boolean hasResources(Long resourceNeeded) { + return false; + } + + /** + * Returns the metric. + * + * @return T, the object that represents this metric. + */ + @Override + public SCMNodeStat get() { + return stat; + } + + /** + * Sets the value of this metric. + * + * @param value - value of the metric. + */ + @Override + public void set(SCMNodeStat value) { + stat.set(value.getCapacity().get(), value.getScmUsed().get(), + value.getRemaining().get()); + } + + /** + * Adds a value of to the base. + * + * @param value - value + */ + @Override + public void add(SCMNodeStat value) { + stat.add(value); + } + + /** + * subtract a value. + * + * @param value value + */ + @Override + public void subtract(SCMNodeStat value) { + stat.subtract(value); + } + + /** + * Compares this object with the specified object for order. Returns a + * negative integer, zero, or a positive integer as this object is less + * than, equal to, or greater than the specified object. + * + * @param o the object to be compared. + * @return a negative integer, zero, or a positive integer as this object is + * less than, equal to, or greater than the specified object. + * @throws NullPointerException if the specified object is null + * @throws ClassCastException if the specified object's type prevents it + * from being compared to this object. + */ + @Override + public int compareTo(SCMNodeStat o) { + if (isEqual(o)) { + return 0; + } + if (isGreater(o)) { + return 1; + } else { + return -1; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SCMNodeMetric that = (SCMNodeMetric) o; + + return stat != null ? stat.equals(that.stat) : that.stat == null; + } + + @Override + public int hashCode() { + return stat != null ? stat.hashCode() : 0; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/SCMNodeStat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/SCMNodeStat.java new file mode 100644 index 00000000000..bb189748717 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/SCMNodeStat.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.scm.container.placement.metrics; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * This class represents the SCM node stat. + */ +public class SCMNodeStat implements NodeStat { + private LongMetric capacity; + private LongMetric scmUsed; + private LongMetric remaining; + + public SCMNodeStat() { + this(0L, 0L, 0L); + } + + public SCMNodeStat(SCMNodeStat other) { + this(other.capacity.get(), other.scmUsed.get(), other.remaining.get()); + } + + public SCMNodeStat(long capacity, long used, long remaining) { + Preconditions.checkArgument(capacity >= 0, "Capacity cannot be " + + "negative."); + Preconditions.checkArgument(used >= 0, "used space cannot be " + + "negative."); + Preconditions.checkArgument(remaining >= 0, "remaining cannot be " + + "negative"); + this.capacity = new LongMetric(capacity); + this.scmUsed = new LongMetric(used); + this.remaining = new LongMetric(remaining); + } + + /** + * @return the total configured capacity of the node. + */ + public LongMetric getCapacity() { + return capacity; + } + + /** + * @return the total SCM used space on the node. + */ + public LongMetric getScmUsed() { + return scmUsed; + } + + /** + * @return the total remaining space available on the node. + */ + public LongMetric getRemaining() { + return remaining; + } + + /** + * Set the capacity, used and remaining space on a datanode. + * + * @param capacity in bytes + * @param used in bytes + * @param remaining in bytes + */ + @VisibleForTesting + public void set(long capacity, long used, long remaining) { + Preconditions.checkNotNull(capacity, "Capacity cannot be null"); + Preconditions.checkNotNull(used, "used cannot be null"); + Preconditions.checkNotNull(remaining, "remaining cannot be null"); + + Preconditions.checkArgument(capacity >= 0, "Capacity cannot be " + + "negative."); + Preconditions.checkArgument(used >= 0, "used space cannot be " + + "negative."); + Preconditions.checkArgument(remaining >= 0, "remaining cannot be " + + "negative"); + + this.capacity = new LongMetric(capacity); + this.scmUsed = new LongMetric(used); + this.remaining = new LongMetric(remaining); + } + + /** + * Adds a new nodestat to existing values of the node. + * + * @param stat Nodestat. + * @return SCMNodeStat + */ + public SCMNodeStat add(NodeStat stat) { + this.capacity.set(this.getCapacity().get() + stat.getCapacity().get()); + this.scmUsed.set(this.getScmUsed().get() + stat.getScmUsed().get()); + this.remaining.set(this.getRemaining().get() + stat.getRemaining().get()); + return this; + } + + /** + * Subtracts the stat values from the existing NodeStat. + * + * @param stat SCMNodeStat. + * @return Modified SCMNodeStat + */ + public SCMNodeStat subtract(NodeStat stat) { + this.capacity.set(this.getCapacity().get() - stat.getCapacity().get()); + this.scmUsed.set(this.getScmUsed().get() - stat.getScmUsed().get()); + this.remaining.set(this.getRemaining().get() - stat.getRemaining().get()); + return this; + } + + @Override + public boolean equals(Object to) { + if (to instanceof SCMNodeStat) { + SCMNodeStat tempStat = (SCMNodeStat) to; + return capacity.isEqual(tempStat.getCapacity().get()) && + scmUsed.isEqual(tempStat.getScmUsed().get()) && + remaining.isEqual(tempStat.getRemaining().get()); + } + return false; + } + + @Override + public int hashCode() { + return Long.hashCode(capacity.get() ^ scmUsed.get() ^ remaining.get()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/package-info.java new file mode 100644 index 00000000000..51350958e15 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/metrics/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.scm.container.placement.metrics; + +// Various metrics supported by Datanode and used by SCM in the placement +// strategy. \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/package-info.java new file mode 100644 index 00000000000..43676effc61 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/placement/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.scm.container.placement; +// Classes related to container placement. \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java index 7acec4df376..c716d290e59 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,19 +16,13 @@ * limitations under the License. */ package org.apache.hadoop.ozone.scm.exceptions; + import java.io.IOException; /** * Exception thrown by SCM. */ public class SCMException extends IOException { - /** - * Error codes to make it easy to decode these exceptions. - */ - public enum ResultCodes { - FAILED_TO_LOAD_NODEPOOL, - NODE_NOT_FOUND_IN_NODEPOOL, - } private final ResultCodes result; /** @@ -88,4 +82,24 @@ public SCMException(Throwable cause, ResultCodes result) { super(cause); this.result = result; } + + /** + * Returns resultCode. + * @return ResultCode + */ + public ResultCodes getResult() { + return result; + } + + /** + * Error codes to make it easy to decode these exceptions. + */ + public enum ResultCodes { + FAILED_TO_LOAD_NODEPOOL, + FAILED_TO_FIND_NODE_IN_POOL, + FAILED_TO_FIND_HEALTHY_NODES, + FAILED_TO_FIND_NODES_WITH_SPACE, + FAILED_TO_FIND_SUITABLE_NODE, + INVALID_CAPACITY + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java index 5bcb1065518..d4ca85f53cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java @@ -21,6 +21,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; +import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat; import java.io.Closeable; import java.util.List; @@ -127,7 +129,7 @@ enum NODESTATE { * @param datanodeID - datanode ID. * @return node stat if it is live/stale, null if it is dead or does't exist. */ - SCMNodeStat getNodeStat(DatanodeID datanodeID); + SCMNodeMetric getNodeStat(DatanodeID datanodeID); /** * Wait for the heartbeat is processed by NodeManager. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeStat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeStat.java index d6875f2a074..bda61f8ccd8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeStat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeStat.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.ozone.scm.node; +package org.apache.hadoop.ozone.scm.container.placement.metrics; import com.google.common.annotations.VisibleForTesting; @@ -28,28 +28,28 @@ interface NodeStat { * Get capacity of the node. * @return capacity of the node. */ - long getCapacity(); + LongMetric getCapacity(); /** * Get the used space of the node. * @return the used space of the node. */ - long getScmUsed(); + LongMetric getScmUsed(); /** * Get the remaining space of the node. * @return the remaining space of the node. */ - long getRemaining(); + LongMetric getRemaining(); /** * Set the total/used/remaining space. - * @param total - total space. + * @param capacity - total space. * @param used - used space. * @param remain - remaining space. */ @VisibleForTesting - void set(long total, long used, long remain); + void set(long capacity, long used, long remain); /** * Adding of the stat. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java index 4cf756b39cb..23e4e94ac66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.scm.node; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; +import java.util.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.collections.map.HashedMap; @@ -42,6 +42,8 @@ .proto.StorageContainerDatanodeProtocolProtos.SCMStorageReport; import org.apache.hadoop.ozone.scm.VersionInfo; +import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.slf4j.Logger; @@ -165,7 +167,7 @@ public SCMNodeManager(OzoneConfiguration conf, String clusterID) executorService = HadoopExecutors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("SCM Heartbeat Processing Thread - %d").build()); - this.inManualChillMode = Optional.absent(); + this.inManualChillMode = Optional.empty(); Preconditions.checkState(heartbeatCheckerIntervalMs > 0); executorService.schedule(this, heartbeatCheckerIntervalMs, @@ -290,7 +292,7 @@ public boolean isOutOfNodeChillMode() { */ @Override public void clearChillModeFlag() { - this.inManualChillMode = Optional.absent(); + this.inManualChillMode = Optional.empty(); } /** @@ -601,8 +603,8 @@ private void updateNodeStat(String datanodeID, SCMNodeReport nodeReport) { List storageReports = nodeReport.getStorageReportList(); for (SCMStorageReport report : storageReports) { totalCapacity += report.getCapacity(); - totalRemaining += report.getRemaining(); - totalScmUsed += report.getScmUsed(); + totalRemaining += report.getRemaining(); + totalScmUsed+= report.getScmUsed(); } scmStat.subtract(stat); stat.set(totalCapacity, totalScmUsed, totalRemaining); @@ -768,8 +770,8 @@ public Map getNodeStats() { * @return node stat if it is live/stale, null if it is dead or does't exist. */ @Override - public SCMNodeStat getNodeStat(DatanodeID datanodeID) { - return nodeStats.get(datanodeID.getDatanodeUuid()); + public SCMNodeMetric getNodeStat(DatanodeID datanodeID) { + return new SCMNodeMetric(nodeStats.get(datanodeID.getDatanodeUuid())); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java index 4c3395479a6..0e1c5f7b693 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodePoolManager.java @@ -46,7 +46,7 @@ import static org.apache.hadoop.ozone.scm .exceptions.SCMException.ResultCodes.FAILED_TO_LOAD_NODEPOOL; import static org.apache.hadoop.ozone.scm - .exceptions.SCMException.ResultCodes.NODE_NOT_FOUND_IN_NODEPOOL; + .exceptions.SCMException.ResultCodes.FAILED_TO_FIND_NODE_IN_POOL; import static org.apache.hadoop.scm .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT; import static org.apache.hadoop.scm @@ -183,7 +183,7 @@ public void removeNode(final String pool, final DatanodeID node) if (kData == null) { throw new SCMException(String.format("Unable to find node %s from" + " pool %s in DB.", DFSUtil.bytes2String(kName), pool), - NODE_NOT_FOUND_IN_NODEPOOL); + FAILED_TO_FIND_NODE_IN_POOL); } nodePoolStore.delete(kName); @@ -194,7 +194,7 @@ public void removeNode(final String pool, final DatanodeID node) } else { throw new SCMException(String.format("Unable to find node %s from" + " pool %s in MAP.", DFSUtil.bytes2String(kName), pool), - NODE_NOT_FOUND_IN_NODEPOOL); } + FAILED_TO_FIND_NODE_IN_POOL); } } finally { lock.writeLock().unlock(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeStat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeStat.java deleted file mode 100644 index 6089e4ed2bd..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeStat.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.scm.node; - -import com.google.common.annotations.VisibleForTesting; - -/** - * This class represents the SCM node stat. - */ -public final class SCMNodeStat implements NodeStat { - private long capacity; - private long scmUsed; - private long remaining; - - public SCMNodeStat() { - } - - public SCMNodeStat(final SCMNodeStat other) { - set(other.capacity, other.scmUsed, other.remaining); - } - - /** - * @return the total configured capacity of the node. - */ - @Override - public long getCapacity() { - return capacity; - } - - /** - * @return the total SCM used space on the node. - */ - @Override - public long getScmUsed() { - return scmUsed; - } - - /** - * @return the total remaining space available on the node. - */ - @Override - public long getRemaining() { - return remaining; - } - - @VisibleForTesting - @Override - public void set(final long total, final long used, final long remain) { - this.capacity = total; - this.scmUsed = used; - this.remaining = remain; - } - - @Override - public SCMNodeStat add(final NodeStat stat) { - this.capacity += stat.getCapacity(); - this.scmUsed += stat.getScmUsed(); - this.remaining += stat.getRemaining(); - return this; - } - - @Override - public SCMNodeStat subtract(final NodeStat stat) { - this.capacity -= stat.getCapacity(); - this.scmUsed -= stat.getScmUsed(); - this.remaining -= stat.getRemaining(); - return this; - } - - @Override - public boolean equals(final Object to) { - return this == to - || (to instanceof SCMNodeStat - && capacity == ((SCMNodeStat) to).getCapacity() - && scmUsed == ((SCMNodeStat) to).getScmUsed() - && remaining == ((SCMNodeStat) to).getRemaining()); - } - - @Override - public int hashCode() { - assert false : "hashCode not designed"; - return 42; // any arbitrary constant will do - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 8f338b90c9d..0398491cac4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -17,9 +17,8 @@ */ package org.apache.hadoop.ozone; -import com.google.common.base.Optional; +import java.util.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; import org.apache.commons.io.FileUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -191,19 +190,16 @@ public OzoneClient createOzoneClient() throws OzoneException { * Waits for the Ozone cluster to be ready for processing requests. */ public void waitOzoneReady() throws TimeoutException, InterruptedException { - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - if (scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY) - >= numDataNodes) { - return true; - } - LOG.info("Waiting for cluster to be ready. Got {} of {} DN Heartbeats.", - scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY), - numDataNodes); - - return false; + GenericTestUtils.waitFor(() -> { + if (scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY) + >= numDataNodes) { + return true; } + LOG.info("Waiting for cluster to be ready. Got {} of {} DN Heartbeats.", + scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY), + numDataNodes); + + return false; }, 1000, 5 * 60 * 1000); //wait for 5 mins. } @@ -216,15 +212,12 @@ public Boolean get() { */ public void waitTobeOutOfChillMode() throws TimeoutException, InterruptedException { - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - if (scm.getScmNodeManager().isOutOfNodeChillMode()) { - return true; - } - LOG.info("Waiting for cluster to be ready. No datanodes found"); - return false; + GenericTestUtils.waitFor(() -> { + if (scm.getScmNodeManager().isOutOfNodeChillMode()) { + return true; } + LOG.info("Waiting for cluster to be ready. No datanodes found"); + return false; }, 100, 45000); } @@ -234,7 +227,7 @@ public void waitForHeartbeatProcessed() throws TimeoutException, scm.getScmNodeManager().waitForHeartbeatProcessed(), 100, 4 * 1000); GenericTestUtils.waitFor(() -> - scm.getScmNodeManager().getStats().getCapacity() > 0, 100, + scm.getScmNodeManager().getStats().getCapacity().get() > 0, 100, 4 * 1000); } @@ -242,21 +235,20 @@ public void waitForHeartbeatProcessed() throws TimeoutException, * Builder for configuring the MiniOzoneCluster to run. */ public static class Builder - extends org.apache.hadoop.hdfs.MiniDFSCluster.Builder { + extends MiniDFSCluster.Builder { private final OzoneConfiguration conf; - private final int defaultHBSeconds = 1; - private final int defaultProcessorMs = 100; + private static final int DEFAULT_HB_SECONDS = 1; + private static final int DEFAULT_PROCESSOR_MS = 100; private final String path; private final UUID runID; - private Optional ozoneHandlerType = Optional.absent(); + private Optional ozoneHandlerType = java.util.Optional.empty(); private Optional enableTrace = Optional.of(false); - private Optional hbSeconds = Optional.absent(); - private Optional hbProcessorInterval = Optional.absent(); - private Optional scmMetadataDir = Optional.absent(); + private Optional hbSeconds = Optional.empty(); + private Optional hbProcessorInterval = Optional.empty(); + private Optional scmMetadataDir = Optional.empty(); private Boolean ozoneEnabled = true; private Boolean waitForChillModeFinish = true; - private int containerWorkerThreadInterval = 1; private Boolean randomContainerPort = true; /** @@ -268,9 +260,6 @@ public Builder(OzoneConfiguration conf) { super(conf); this.conf = conf; - // TODO : Remove this later, with SCM, NN and SCM can run together. - //this.nnTopology(new MiniDFSNNTopology()); // No NameNode required - URL p = conf.getClass().getResource(""); path = p.getPath().concat(MiniOzoneCluster.class.getSimpleName() + UUID .randomUUID().toString()); @@ -329,11 +318,6 @@ public Builder doNotwaitTobeOutofChillMode() { return this; } - public Builder setSCMContainerWorkerThreadInterval(int intervalInSeconds) { - containerWorkerThreadInterval = intervalInSeconds; - return this; - } - public String getPath() { return path; } @@ -391,7 +375,7 @@ private void configScmMetadata() throws IOException { return; } - // If user has not specified a path, create a UUID for this miniCluser + // If user has not specified a path, create a UUID for this miniCluster // and create SCM under that directory. Path scmPath = Paths.get(path, runID.toString(), "scm"); Files.createDirectories(scmPath); @@ -417,9 +401,11 @@ private void configureTrace() { if (enableTrace.isPresent()) { conf.setBoolean(OzoneConfigKeys.OZONE_TRACE_ENABLED_KEY, enableTrace.get()); + GenericTestUtils.setLogLevel(org.apache.log4j.Logger.getRootLogger(), + Level.ALL); } GenericTestUtils.setLogLevel(org.apache.log4j.Logger.getRootLogger(), - Level.ALL); + Level.INFO); } private void configureSCMheartbeat() { @@ -429,7 +415,7 @@ private void configureSCMheartbeat() { } else { conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, - defaultHBSeconds); + DEFAULT_HB_SECONDS); } if (hbProcessorInterval.isPresent()) { @@ -437,7 +423,7 @@ private void configureSCMheartbeat() { hbProcessorInterval.get()); } else { conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, - defaultProcessorMs); + DEFAULT_PROCESSOR_MS); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java index 92154f5a869..e81fc0e55fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java @@ -19,8 +19,8 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ozone.scm.container.ContainerPlacementPolicy; -import org.apache.hadoop.ozone.scm.container.SCMContainerPlacementCapacity; +import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy; +import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementCapacity; import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.client.ContainerOperationClient; @@ -41,7 +41,7 @@ public class TestContainerOperations { private static ScmClient storageClient; - private static MiniOzoneCluster cluster;; + private static MiniOzoneCluster cluster; private static OzoneConfiguration ozoneConf; @BeforeClass diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java new file mode 100644 index 00000000000..d798c618453 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.placement; + +import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.scm.container.MockNodeManager; +import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementCapacity; +import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementRandom; +import org.apache.hadoop.ozone.scm.exceptions.SCMException; +import org.apache.hadoop.ozone.scm.node.NodeManager; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +/** + * Asserts that allocation strategy works as expected. + */ +public class TestContainerPlacement { + + private DescriptiveStatistics computeStatistics(NodeManager nodeManager) { + DescriptiveStatistics descriptiveStatistics = new DescriptiveStatistics(); + for (DatanodeID id : nodeManager.getNodes(NodeManager.NODESTATE.HEALTHY)) { + float weightedValue = + nodeManager.getNodeStat(id).get().getScmUsed().get() / (float) + nodeManager.getNodeStat(id).get().getCapacity().get(); + descriptiveStatistics.addValue(weightedValue); + } + return descriptiveStatistics; + } + + /** + * This test simulates lots of Cluster I/O and updates the metadata in SCM. + * We simulate adding and removing containers from the cluster. It asserts + * that our placement algorithm has taken the capacity of nodes into + * consideration by asserting that standard deviation of used space on these + * has improved. + */ + @Test + public void testCapacityPlacementYieldsBetterDataDistribution() throws + SCMException { + final int opsCount = 200 * 1000; + final int nodesRequired = 3; + Random random = new Random(); + + // The nature of init code in MockNodeManager yields similar clusters. + MockNodeManager nodeManagerCapacity = new MockNodeManager(true, 100); + MockNodeManager nodeManagerRandom = new MockNodeManager(true, 100); + DescriptiveStatistics beforeCapacity = + computeStatistics(nodeManagerCapacity); + DescriptiveStatistics beforeRandom = computeStatistics(nodeManagerRandom); + + //Assert that our initial layout of clusters are similar. + assertEquals(beforeCapacity.getStandardDeviation(), beforeRandom + .getStandardDeviation(), 0.001); + + SCMContainerPlacementCapacity capacityPlacer = new + SCMContainerPlacementCapacity(nodeManagerCapacity, new Configuration()); + SCMContainerPlacementRandom randomPlacer = new + SCMContainerPlacementRandom(nodeManagerRandom, new Configuration()); + + for (int x = 0; x < opsCount; x++) { + long containerSize = random.nextInt(100) * OzoneConsts.GB; + List nodesCapacity = + capacityPlacer.chooseDatanodes(nodesRequired, containerSize); + assertEquals(nodesRequired, nodesCapacity.size()); + + List nodesRandom = randomPlacer.chooseDatanodes(nodesRequired, + containerSize); + + // One fifth of all calls are delete + if (x % 5 == 0) { + deleteContainer(nodeManagerCapacity, nodesCapacity, containerSize); + deleteContainer(nodeManagerRandom, nodesRandom, containerSize); + } else { + createContainer(nodeManagerCapacity, nodesCapacity, containerSize); + createContainer(nodeManagerRandom, nodesRandom, containerSize); + } + } + DescriptiveStatistics postCapacity = computeStatistics(nodeManagerCapacity); + DescriptiveStatistics postRandom = computeStatistics(nodeManagerRandom); + + // This is a very bold claim, and needs large number of I/O operations. + // The claim in this assertion is that we improved the data distribution + // of this cluster in relation to the start state of the cluster. + Assert.assertTrue(beforeCapacity.getStandardDeviation() > + postCapacity.getStandardDeviation()); + + // This asserts that Capacity placement yields a better placement + // algorithm than random placement, since both cluster started at an + // identical state. + + Assert.assertTrue(postRandom.getStandardDeviation() > + postCapacity.getStandardDeviation()); + } + + private void deleteContainer(MockNodeManager nodeManager, + List nodes, long containerSize) { + for (DatanodeID id : nodes) { + nodeManager.delContainer(id, containerSize); + } + } + + private void createContainer(MockNodeManager nodeManager, + List nodes, long containerSize) { + for (DatanodeID id : nodes) { + nodeManager.addContainer(id, containerSize); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/placement/TestDatanodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/placement/TestDatanodeMetrics.java new file mode 100644 index 00000000000..ccc23c32243 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/placement/TestDatanodeMetrics.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.placement; + +import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests that test Metrics that support placement. + */ +public class TestDatanodeMetrics { + @Rule + public ExpectedException exception = ExpectedException.none(); + @Test + public void testSCMNodeMetric() { + SCMNodeStat stat = new SCMNodeStat(100L, 10L, 90L); + assertEquals((long) stat.getCapacity().get(), 100L); + assertEquals((long) stat.getScmUsed().get(), 10L); + assertEquals((long) stat.getRemaining().get(), 90L); + SCMNodeMetric metric = new SCMNodeMetric(stat); + + SCMNodeStat newStat = new SCMNodeStat(100L, 10L, 90L); + assertEquals((long) stat.getCapacity().get(), 100L); + assertEquals((long) stat.getScmUsed().get(), 10L); + assertEquals((long) stat.getRemaining().get(), 90L); + + SCMNodeMetric newMetric = new SCMNodeMetric(newStat); + assertTrue(metric.isEqual(newMetric.get())); + + newMetric.add(stat); + assertTrue(newMetric.isGreater(metric.get())); + + SCMNodeMetric zeroMetric = new SCMNodeMetric(new SCMNodeStat()); + // Assert we can handle zero capacity. + assertTrue(metric.isGreater(zeroMetric.get())); + + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java index 8a8ea68daef..a30707c2ef2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java @@ -22,8 +22,8 @@ import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.scm.container.ContainerPlacementPolicy; -import org.apache.hadoop.ozone.scm.container.SCMContainerPlacementCapacity; +import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy; +import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementCapacity; import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.scm.protocolPB .StorageContainerLocationProtocolClientSideTranslatorPB; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java index 14c36fc4413..638de9ef586 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java @@ -18,6 +18,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; @@ -25,9 +26,9 @@ .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMNodeReport; - +import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.ozone.scm.node.NodeManager; -import org.apache.hadoop.ozone.scm.node.SCMNodeStat; import java.io.IOException; import java.util.HashMap; @@ -39,21 +40,55 @@ * Test Helper for testing container Mapping. */ public class MockNodeManager implements NodeManager { - private final List healthyNodes; private static final int HEALTHY_NODE_COUNT = 10; + private final static NodeData[] NODES = { + new NodeData(10L * OzoneConsts.TB, OzoneConsts.GB), + new NodeData(64L * OzoneConsts.TB, 100 * OzoneConsts.GB), + new NodeData(128L * OzoneConsts.TB, 256 * OzoneConsts.GB), + new NodeData(40L * OzoneConsts.TB, OzoneConsts.TB), + new NodeData(256L * OzoneConsts.TB, 200 * OzoneConsts.TB), + new NodeData(20L * OzoneConsts.TB, 10 * OzoneConsts.GB), + new NodeData(32L * OzoneConsts.TB, 16 * OzoneConsts.TB), + new NodeData(OzoneConsts.TB, 900 * OzoneConsts.GB), + }; + private final List healthyNodes; + private final Map nodeMetricMap; + private final SCMNodeStat aggregateStat; private boolean chillmode; - public MockNodeManager() { + public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { this.healthyNodes = new LinkedList<>(); - for (int x = 0; x < 10; x++) { - healthyNodes.add(SCMTestUtils.getDatanodeID()); + this.nodeMetricMap = new HashMap<>(); + aggregateStat = new SCMNodeStat(); + if (initializeFakeNodes) { + for (int x = 0; x < nodeCount; x++) { + DatanodeID id = SCMTestUtils.getDatanodeID(); + healthyNodes.add(id); + populateNodeMetric(id, x); + } } chillmode = false; } + /** + * Invoked from ctor to create some node Metrics. + * + * @param datanodeID - Datanode ID + */ + private void populateNodeMetric(DatanodeID datanodeID, int x) { + SCMNodeStat newStat = new SCMNodeStat(); + long remaining = + NODES[x % NODES.length].capacity - NODES[x % NODES.length].used; + newStat.set( + (NODES[x % NODES.length].capacity), + (NODES[x % NODES.length].used), remaining); + this.nodeMetricMap.put(datanodeID.toString(), newStat); + aggregateStat.add(newStat); + } + /** * Sets the chill mode value. - * @param chillmode boolean + * @param chillmode boolean */ public void setChillmode(boolean chillmode) { this.chillmode = chillmode; @@ -184,7 +219,7 @@ public boolean isInManualChillMode() { */ @Override public SCMNodeStat getStats() { - return null; + return aggregateStat; } /** @@ -193,7 +228,7 @@ public SCMNodeStat getStats() { */ @Override public Map getNodeStats() { - return null; + return nodeMetricMap; } /** @@ -202,8 +237,8 @@ public Map getNodeStats() { * @return node stat if it is live/stale, null if it is dead or does't exist. */ @Override - public SCMNodeStat getNodeStat(DatanodeID datanodeID) { - return null; + public SCMNodeMetric getNodeStat(DatanodeID datanodeID) { + return new SCMNodeMetric(nodeMetricMap.get(datanodeID.toString())); } /** @@ -283,15 +318,103 @@ public SCMCommand register(DatanodeID datanodeID) { @Override public List sendHeartbeat(DatanodeID datanodeID, SCMNodeReport nodeReport) { + if ((datanodeID != null) && (nodeReport != null) && (nodeReport + .getStorageReportCount() > 0)) { + SCMNodeStat stat = this.nodeMetricMap.get(datanodeID.toString()); + + long totalCapacity = 0L; + long totalRemaining = 0L; + long totalScmUsed = 0L; + List + storageReports = nodeReport.getStorageReportList(); + for (StorageContainerDatanodeProtocolProtos.SCMStorageReport report : + storageReports) { + totalCapacity += report.getCapacity(); + totalRemaining +=report.getRemaining(); + totalScmUsed += report.getScmUsed(); + } + aggregateStat.subtract(stat); + stat.set(totalCapacity, totalScmUsed, totalRemaining); + aggregateStat.add(stat); + nodeMetricMap.put(datanodeID.toString(), stat); + + } return null; } @Override public Map getNodeCount() { Map nodeCountMap = new HashMap(); - for(NodeManager.NODESTATE state : NodeManager.NODESTATE.values()) { + for (NodeManager.NODESTATE state : NodeManager.NODESTATE.values()) { nodeCountMap.put(state.toString(), getNodeCount(state)); } return nodeCountMap; } + + /** + * Makes it easy to add a container. + * + * @param datanodeID datanode ID + * @param size number of bytes. + */ + public void addContainer(DatanodeID datanodeID, long size) { + SCMNodeStat stat = this.nodeMetricMap.get(datanodeID.toString()); + if (stat != null) { + aggregateStat.subtract(stat); + stat.getCapacity().add(size); + aggregateStat.add(stat); + nodeMetricMap.put(datanodeID.toString(), stat); + } + } + + /** + * Makes it easy to simulate a delete of a container. + * + * @param datanodeID datanode ID + * @param size number of bytes. + */ + public void delContainer(DatanodeID datanodeID, long size) { + SCMNodeStat stat = this.nodeMetricMap.get(datanodeID.toString()); + if (stat != null) { + aggregateStat.subtract(stat); + stat.getCapacity().subtract(size); + aggregateStat.add(stat); + nodeMetricMap.put(datanodeID.toString(), stat); + } + } + + /** + * A class to declare some values for the nodes so that our tests + * won't fail. + */ + private static class NodeData { + private long capacity, used; + + /** + * Constructs a nodeDefinition. + * + * @param capacity capacity. + * @param used used. + */ + NodeData(long capacity, long used) { + this.capacity = capacity; + this.used = used; + } + + public long getCapacity() { + return capacity; + } + + public void setCapacity(long capacity) { + this.capacity = capacity; + } + + public long getUsed() { + return used; + } + + public void setUsed(long used) { + this.used = used; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java index d83053aa9ca..0b9b76f365d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java @@ -59,15 +59,17 @@ public static void setUp() throws Exception { testDir = Paths.get(path).toFile(); boolean folderExisted = testDir.exists() || testDir.mkdirs(); if (!folderExisted) { - throw new IOException("Unable to create test diectory path"); + throw new IOException("Unable to create test directory path"); } - nodeManager = new MockNodeManager(); + nodeManager = new MockNodeManager(true, 10); mapping = new ContainerMapping(conf, nodeManager, 128); } @AfterClass public static void cleanup() throws IOException { - mapping.close(); + if(mapping != null) { + mapping.close(); + } FileUtil.fullyDelete(testDir); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java index 02f1486a376..d28866e9533 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -26,10 +26,11 @@ import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.SCMTestUtils; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.ozone.scm.container.ContainerMapping; -import org.apache.hadoop.ozone.scm.container.ContainerPlacementPolicy; -import org.apache.hadoop.ozone.scm.container.SCMContainerPlacementCapacity; +import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy; +import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementCapacity; import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.scm.client.ScmClient; import org.apache.hadoop.scm.container.common.helpers.Pipeline; @@ -93,6 +94,7 @@ ContainerMapping createContainerManager(Configuration config, return new ContainerMapping(config, scmNodeManager, cacheSize); } + /** * Test capacity based container placement policy with node reports. * @@ -122,7 +124,7 @@ public void testContainerPlacementCapacity() throws IOException, List datanodes = SCMTestUtils.getRegisteredDatanodeIDs(nodeManager, nodeCount); try { - for (DatanodeID datanodeID: datanodes) { + for (DatanodeID datanodeID : datanodes) { StorageContainerDatanodeProtocolProtos.SCMNodeReport.Builder nrb = StorageContainerDatanodeProtocolProtos.SCMNodeReport.newBuilder(); StorageContainerDatanodeProtocolProtos.SCMStorageReport.Builder srb = @@ -139,11 +141,11 @@ public void testContainerPlacementCapacity() throws IOException, 100, 4 * 1000); assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY)); assertEquals(capacity * nodeCount, - nodeManager.getStats().getCapacity()); + (long) nodeManager.getStats().getCapacity().get()); assertEquals(used * nodeCount, - nodeManager.getStats().getScmUsed()); + (long) nodeManager.getStats().getScmUsed().get()); assertEquals(remaining * nodeCount, - nodeManager.getStats().getRemaining()); + (long) nodeManager.getStats().getRemaining().get()); assertTrue(nodeManager.isOutOfNodeChillMode()); @@ -155,7 +157,7 @@ public void testContainerPlacementCapacity() throws IOException, final long newUsed = 7L * OzoneConsts.GB; final long newRemaining = capacity - newUsed; - for (DatanodeID datanodeID: datanodes) { + for (DatanodeID datanodeID : datanodes) { StorageContainerDatanodeProtocolProtos.SCMNodeReport.Builder nrb = StorageContainerDatanodeProtocolProtos.SCMNodeReport.newBuilder(); StorageContainerDatanodeProtocolProtos.SCMStorageReport.Builder srb = @@ -168,14 +170,14 @@ public void testContainerPlacementCapacity() throws IOException, nrb.addStorageReport(srb).build()); } - GenericTestUtils.waitFor(() -> nodeManager.getStats().getRemaining() == - nodeCount * newRemaining, + GenericTestUtils.waitFor(() -> nodeManager.getStats().getRemaining() + .get() == nodeCount * newRemaining, 100, 4 * 1000); thrown.expect(IOException.class); thrown.expectMessage( - startsWith("No healthy node found with enough remaining capacity to" + - " allocate container.")); + startsWith("Unable to find enough nodes that meet the space " + + "requirement in healthy node set.")); String container2 = UUID.randomUUID().toString(); containerManager.allocateContainer(container2, ScmClient.ReplicationFactor.THREE); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java index da567cbdcdb..25ef7cc981a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -26,6 +26,7 @@ .StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMStorageReport; +import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.PathUtils; @@ -45,14 +46,14 @@ import java.util.UUID; import java.util.concurrent.TimeoutException; +import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.DEAD; +import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY; +import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.STALE; import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_MS; import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS; import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS; import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS; import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL_MS; -import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY; -import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.STALE; -import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.DEAD; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.StringStartsWith.startsWith; @@ -158,7 +159,7 @@ public void testScmNoHeartbeats() throws IOException, GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), 100, 4 * 1000); assertFalse("No heartbeats, Node manager should have been in" + - " chill mode.", nodeManager.isOutOfNodeChillMode()); + " chill mode.", nodeManager.isOutOfNodeChillMode()); } } @@ -208,7 +209,7 @@ public void testScmSameNodeHeartbeats() throws IOException, GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), 100, 4 * 1000); assertFalse("Not enough nodes have send heartbeat to node" + - "manager.", nodeManager.isOutOfNodeChillMode()); + "manager.", nodeManager.isOutOfNodeChillMode()); } } @@ -631,11 +632,12 @@ public void testScmClusterIsInExpectedState1() throws IOException, * @throws InterruptedException */ private void heartbeatNodeSet(SCMNodeManager manager, List list, - int sleepDuration) throws InterruptedException { + int sleepDuration) throws InterruptedException { while (!Thread.currentThread().isInterrupted()) { for (DatanodeID dn : list) { manager.sendHeartbeat(dn, null); - } Thread.sleep(sleepDuration); + } + Thread.sleep(sleepDuration); } } @@ -664,7 +666,7 @@ private List createNodeSet(SCMNodeManager nodeManager, int * @return true if we found the expected number. */ private boolean findNodes(NodeManager nodeManager, int count, - NodeManager.NODESTATE state) { + NodeManager.NODESTATE state) { return count == nodeManager.getNodeCount(state); } @@ -690,7 +692,6 @@ public void testScmClusterIsInExpectedState2() throws IOException, conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 7000); - try (SCMNodeManager nodeManager = createNodeManager(conf)) { List healthyNodeList = createNodeSet(nodeManager, healthyCount, "Healthy"); @@ -902,7 +903,7 @@ public void testScmEnterAndExitChillMode() throws IOException, // Assert that node manager force enter cannot be overridden by nodes HBs. - for(int x= 0; x < 20; x++) { + for (int x = 0; x < 20; x++) { DatanodeID datanode = SCMTestUtils.getDatanodeID(nodeManager); nodeManager.sendHeartbeat(datanode, null); } @@ -952,12 +953,12 @@ public void testScmStatsFromNodeReport() throws IOException, GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(), 100, 4 * 1000); assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY)); - assertEquals(capacity * nodeCount, - nodeManager.getStats().getCapacity()); - assertEquals(used * nodeCount, - nodeManager.getStats().getScmUsed()); - assertEquals(remaining * nodeCount, - nodeManager.getStats().getRemaining()); + assertEquals(capacity * nodeCount, (long) nodeManager.getStats() + .getCapacity().get()); + assertEquals(used * nodeCount, (long) nodeManager.getStats() + .getScmUsed().get()); + assertEquals(remaining * nodeCount, (long) nodeManager.getStats() + .getRemaining().get()); } } @@ -998,31 +999,41 @@ public void testScmNodeReportUpdate() throws IOException, Thread.sleep(100); } - final long expectedScmUsed = usedPerHeartbeat * (heartbeatCount -1); - final long expectedRemaining = capacity - - usedPerHeartbeat * (heartbeatCount - 1); + final long expectedScmUsed = usedPerHeartbeat * (heartbeatCount - 1); + final long expectedRemaining = capacity - expectedScmUsed; GenericTestUtils.waitFor( - () -> nodeManager.getStats().getScmUsed() == expectedScmUsed, 100, - 4 * 1000); + () -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed, + 100, 4 * 1000); - assertEquals(capacity, nodeManager.getStats().getCapacity()); - assertEquals(expectedScmUsed, nodeManager.getStats().getScmUsed()); - assertEquals(expectedRemaining, nodeManager.getStats().getRemaining()); + long foundCapacity = nodeManager.getStats().getCapacity().get(); + assertEquals(capacity, foundCapacity); + + long foundScmUsed = nodeManager.getStats().getScmUsed().get(); + assertEquals(expectedScmUsed, foundScmUsed); + + long foundRemaining = nodeManager.getStats().getRemaining().get(); + assertEquals(expectedRemaining, foundRemaining); // Test NodeManager#getNodeStats assertEquals(nodeCount, nodeManager.getNodeStats().size()); - assertEquals(capacity, nodeManager.getNodeStat(datanodeID).getCapacity()); - assertEquals(expectedScmUsed, - nodeManager.getNodeStat(datanodeID).getScmUsed()); - assertEquals(expectedRemaining, - nodeManager.getNodeStat(datanodeID).getRemaining()); + long nodeCapacity = nodeManager.getNodeStat(datanodeID).get() + .getCapacity().get(); + assertEquals(capacity, nodeCapacity); + + foundScmUsed = nodeManager.getNodeStat(datanodeID).get().getScmUsed() + .get(); + assertEquals(expectedScmUsed, foundScmUsed); + + foundRemaining = nodeManager.getNodeStat(datanodeID).get() + .getRemaining().get(); + assertEquals(expectedRemaining, foundRemaining); // Compare the result from // NodeManager#getNodeStats and NodeManager#getNodeStat SCMNodeStat stat1 = nodeManager.getNodeStats(). get(datanodeID.getDatanodeUuid()); - SCMNodeStat stat2 = nodeManager.getNodeStat(datanodeID); + SCMNodeStat stat2 = nodeManager.getNodeStat(datanodeID).get(); assertEquals(stat1, stat2); // Wait up to 4s so that the node becomes stale @@ -1031,11 +1042,17 @@ public void testScmNodeReportUpdate() throws IOException, () -> nodeManager.getNodeCount(NodeManager.NODESTATE.STALE) == 1, 100, 4 * 1000); assertEquals(nodeCount, nodeManager.getNodeStats().size()); - assertEquals(capacity, nodeManager.getNodeStat(datanodeID).getCapacity()); - assertEquals(expectedScmUsed, - nodeManager.getNodeStat(datanodeID).getScmUsed()); - assertEquals(expectedRemaining, - nodeManager.getNodeStat(datanodeID).getRemaining()); + + foundCapacity = nodeManager.getNodeStat(datanodeID).get() + .getCapacity().get(); + assertEquals(capacity, foundCapacity); + foundScmUsed = nodeManager.getNodeStat(datanodeID).get() + .getScmUsed().get(); + assertEquals(expectedScmUsed, foundScmUsed); + + foundRemaining = nodeManager.getNodeStat(datanodeID).get(). + getRemaining().get(); + assertEquals(expectedRemaining, foundRemaining); // Wait up to 4 more seconds so the node becomes dead // Verify usage info should be updated. @@ -1044,11 +1061,16 @@ public void testScmNodeReportUpdate() throws IOException, 4 * 1000); assertEquals(0, nodeManager.getNodeStats().size()); - assertEquals(0, nodeManager.getStats().getCapacity()); - assertEquals(0, nodeManager.getStats().getScmUsed()); - assertEquals(0, nodeManager.getStats().getRemaining()); + foundCapacity = nodeManager.getStats().getCapacity().get(); + assertEquals(0, foundCapacity); - // Send a new report to bring the dead node back to healty + foundScmUsed = nodeManager.getStats().getScmUsed().get(); + assertEquals(0, foundScmUsed); + + foundRemaining = nodeManager.getStats().getRemaining().get(); + assertEquals(0, foundRemaining); + + // Send a new report to bring the dead node back to healthy SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder(); SCMStorageReport.Builder srb = SCMStorageReport.newBuilder(); srb.setStorageUuid(UUID.randomUUID().toString()); @@ -1063,14 +1085,18 @@ public void testScmNodeReportUpdate() throws IOException, () -> nodeManager.getNodeCount(NodeManager.NODESTATE.HEALTHY) == 1, 100, 5 * 1000); GenericTestUtils.waitFor( - () -> nodeManager.getStats().getScmUsed() == expectedScmUsed, 100, - 4 * 1000); + () -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed, + 100, 4 * 1000); assertEquals(nodeCount, nodeManager.getNodeStats().size()); - assertEquals(capacity, nodeManager.getNodeStat(datanodeID).getCapacity()); - assertEquals(expectedScmUsed, - nodeManager.getNodeStat(datanodeID).getScmUsed()); - assertEquals(expectedRemaining, - nodeManager.getNodeStat(datanodeID).getRemaining()); + foundCapacity = nodeManager.getNodeStat(datanodeID).get() + .getCapacity().get(); + assertEquals(capacity, foundCapacity); + foundScmUsed = nodeManager.getNodeStat(datanodeID).get().getScmUsed() + .get(); + assertEquals(expectedScmUsed, foundScmUsed); + foundRemaining = nodeManager.getNodeStat(datanodeID).get() + .getRemaining().get(); + assertEquals(expectedRemaining, foundRemaining); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodePoolManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodePoolManager.java index 365554437d3..2fd2c8303bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodePoolManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodePoolManager.java @@ -24,8 +24,8 @@ import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.container.common.SCMTestUtils; -import org.apache.hadoop.ozone.scm.container.ContainerPlacementPolicy; -import org.apache.hadoop.ozone.scm.container.SCMContainerPlacementCapacity; +import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy; +import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementCapacity; import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.test.PathUtils; import org.junit.Rule;