HDFS-11564. Ozone: SCM: Add Comparable Metric Support. Contributed by Anu Engineer.

This commit is contained in:
Anu Engineer 2017-04-03 13:33:11 -07:00 committed by Owen O'Malley
parent 68da45a789
commit 82d814fa79
30 changed files with 1595 additions and 616 deletions

View File

@ -23,11 +23,14 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementRandom;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.utils.LevelDBStore;
import org.iq80.leveldb.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,7 +42,6 @@
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.iq80.leveldb.Options;
/**
* Mapping class contains the mapping from a name to a pipeline mapping. This is
@ -94,7 +96,6 @@ public ContainerMapping(final Configuration conf,
this.containerSize = OzoneConsts.GB * conf.getInt(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
this.placementPolicy = createContainerPlacementPolicy(nodeManager, conf);
}
@ -105,6 +106,7 @@ public ContainerMapping(final Configuration conf,
* @param conf - configuration.
* @return SCM container placement policy implementation instance.
*/
@SuppressWarnings("unchecked")
private static ContainerPlacementPolicy createContainerPlacementPolicy(
final NodeManager nodeManager, final Configuration conf) {
Class<? extends ContainerPlacementPolicy> implClass =
@ -123,8 +125,11 @@ private static ContainerPlacementPolicy createContainerPlacementPolicy(
throw new RuntimeException(implClass.getName()
+ " could not be constructed.", e.getCause());
} catch (Exception e) {
LOG.error("Unhandled exception occured, Placement policy will not be " +
"functional.");
throw new IllegalArgumentException("Unable to load " +
"ContainerPlacementPolicy", e);
}
return null;
}
/**
@ -148,7 +153,6 @@ private static Pipeline newPipelineFromNodes(final List<DatanodeID> nodes,
return pipeline;
}
/**
* Returns the Pipeline from the container name.
*
@ -157,7 +161,7 @@ private static Pipeline newPipelineFromNodes(final List<DatanodeID> nodes,
*/
@Override
public Pipeline getContainer(final String containerName) throws IOException {
Pipeline pipeline = null;
Pipeline pipeline;
lock.lock();
try {
byte[] pipelineBytes =

View File

@ -15,7 +15,7 @@
* the License.
*/
package org.apache.hadoop.ozone.scm.container;
package org.apache.hadoop.ozone.scm.container.placement.algorithms;
import org.apache.hadoop.hdfs.protocol.DatanodeID;

View File

@ -1,207 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.container;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.node.SCMNodeStat;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import static java.lang.Math.abs;
/**
* Container placement policy that randomly choose datanodes with remaining
* space satisfy the size constraints.
*/
public final class SCMContainerPlacementCapacity
implements ContainerPlacementPolicy {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMContainerPlacementCapacity.class);
private static int maxRetry = 100;
private final NodeManager nodeManager;
private final Random rand;
private final Configuration conf;
public SCMContainerPlacementCapacity(final NodeManager nodeManager,
final Configuration conf) {
this.nodeManager = nodeManager;
this.rand = new Random();
this.conf = conf;
}
@Override
public List<DatanodeID> chooseDatanodes(final int nodesRequired,
final long sizeRequired) throws IOException {
List<DatanodeID> healthyNodes =
nodeManager.getNodes(NodeManager.NODESTATE.HEALTHY);
if (healthyNodes.size() == 0) {
throw new IOException("No healthy node found to allocate container.");
}
if (healthyNodes.size() < nodesRequired) {
throw new IOException("Not enough nodes to allocate container with " +
nodesRequired + " datanodes required.");
}
if (healthyNodes.size() == nodesRequired) {
return healthyNodes;
}
// TODO: add allocation time as metrics
long beginTime = Time.monotonicNow();
Set<DatanodeID> results = new HashSet<>();
for (int i = 0; i < nodesRequired; i++) {
DatanodeID candidate = chooseNode(results, healthyNodes, sizeRequired);
if (candidate != null) {
results.add(candidate);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding datanode {}. Results.size = {} nodesRequired = {}",
candidate, results.size(), nodesRequired);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Can't find datanode. Results.size = {} nodesRequired = {}",
results.size(), nodesRequired);
}
break;
}
}
if (LOG.isTraceEnabled()) {
long endTime = Time.monotonicNow();
LOG.trace("SCMContainerPlacementCapacity takes {} ms to choose nodes.",
endTime - beginTime);
}
// TODO: handle under replicated case.
// For now, throw exception only when we can't find any datanode.
if (results.size() == 0) {
throw new IOException("No healthy node found " +
"with enough remaining capacity to allocate container.");
}
if (results.size() != nodesRequired) {
if (LOG.isDebugEnabled()) {
LOG.debug("SCMContainerPlacementCapacity cannot find enough healthy" +
" datanodes with remaining capacity > {} ." +
"(nodesRequired = {}, nodesFound = {})", sizeRequired,
nodesRequired, results.size());
}
}
return results.stream().collect(Collectors.toList());
}
/**
* Choose one random node from 2-Random nodes that satisfy the size required.
* @param results - set of current chosen datanodes.
* @param healthyNodes - all healthy datanodes.
* @param sizeRequired - size required for container.
* @return one with larger remaining capacity from two randomly chosen
* datanodes that satisfy sizeRequirement but are not in current
* result set.
*/
private DatanodeID chooseNode(final Set results,
final List<DatanodeID> healthyNodes, final long sizeRequired) {
NodeAndStat firstNode = chooseOneNode(results, healthyNodes,
sizeRequired);
if (firstNode == null) {
return null;
}
NodeAndStat secondNode = chooseOneNode(results, healthyNodes,
sizeRequired);
if (secondNode == null) {
return firstNode.getDatanodeID();
}
// Pick one with larger remaining space.
return firstNode.getDatanodeStat().getRemaining() >
secondNode.getDatanodeStat().getRemaining() ?
firstNode.getDatanodeID() : secondNode.getDatanodeID();
}
/**
* Choose one random node from healthy nodes that satisfies the size
* requirement and has not been chosen in the existing results.
* Retry up to maxRetry(100) times.
* @param results - set of current chosen datanodes.
* @param healthyNodes - all healthy datanodes.
* @param sizeRequired - size required for container.
* @return one with larger remaining capacity from two randomly chosen
* datanodes that satisfy sizeRequirement but are not in current
* result set.
*/
private NodeAndStat chooseOneNode(final Set<DatanodeID> results,
final List<DatanodeID> healthyNodes, final long sizeRequired) {
NodeAndStat selectedNode = null;
int retry = 0;
while (selectedNode == null && retry < maxRetry) {
int candidateIdx = abs(rand.nextInt() % healthyNodes.size());
DatanodeID candidate = healthyNodes.get(candidateIdx);
if (!results.contains(candidate)) {
SCMNodeStat stat = nodeManager.getNodeStat(candidate);
if (stat != null && stat.getRemaining() > sizeRequired) {
selectedNode = new NodeAndStat(candidate, stat);
break;
}
}
retry++;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Find {} after {} retries!", (selectedNode != null) ?
selectedNode.getDatanodeID() : "no datanode", retry);
}
return selectedNode;
}
/**
* Helper class wraps DatanodeID and SCMNodeStat.
*/
static class NodeAndStat {
private final DatanodeID datanodeID;
private final SCMNodeStat stat;
NodeAndStat(final DatanodeID id, final SCMNodeStat stat) {
this.datanodeID = id;
this.stat = stat;
}
public DatanodeID getDatanodeID() {
return datanodeID;
}
public SCMNodeStat getDatanodeStat() {
return stat;
}
}
}

View File

@ -1,146 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.container;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import static java.lang.Math.abs;
/**
* Container placement policy that randomly chooses healthy datanodes.
*/
public final class SCMContainerPlacementRandom
implements ContainerPlacementPolicy {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMContainerPlacementRandom.class);
private static int maxRetry = 100;
private final NodeManager nodeManager;
private final Random rand;
private final Configuration conf;
public SCMContainerPlacementRandom(final NodeManager nodeManager,
final Configuration conf) {
this.nodeManager = nodeManager;
this.rand = new Random();
this.conf = conf;
}
@Override
public List<DatanodeID> chooseDatanodes(final int nodesRequired,
final long sizeRequired) throws IOException {
List<DatanodeID> healthyNodes =
nodeManager.getNodes(NodeManager.NODESTATE.HEALTHY);
if (healthyNodes.size() == 0) {
throw new IOException("No healthy node found to allocate container.");
}
if (healthyNodes.size() < nodesRequired) {
throw new IOException("Not enough nodes to allocate container with "
+ nodesRequired + " datanodes required.");
}
if (healthyNodes.size() == nodesRequired) {
return healthyNodes;
}
// TODO: add allocation time as metrics
long beginTime = Time.monotonicNow();
Set<DatanodeID> results = new HashSet<>();
for (int i = 0; i < nodesRequired; i++) {
DatanodeID candidate = chooseNode(results, healthyNodes);
if (candidate != null) {
results.add(candidate);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding datanode {}. Results.size = {} nodesRequired = {}",
candidate, results.size(), nodesRequired);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Can't find datanode. Results.size = {} nodesRequired = {}",
results.size(), nodesRequired);
}
break;
}
}
if (LOG.isTraceEnabled()) {
long endTime = Time.monotonicNow();
LOG.trace("SCMContainerPlacementRandom takes {} ms to choose nodes.",
endTime - beginTime);
}
if (results.size() != nodesRequired) {
if (LOG.isDebugEnabled()) {
LOG.debug("SCMContainerPlacementRandom cannot find enough healthy" +
" datanodes. (nodesRequired = {}, nodesFound = {})",
nodesRequired, results.size());
}
}
return results.stream().collect(Collectors.toList());
}
/**
* Choose one random node from 2-Random nodes. Retry up to 100 times until
* find one that has not been chosen in the exising results.
* @param results - set of current chosen datanodes.
* @param healthyNodes - all healthy datanodes.
* @return one randomly chosen datanode that from two randomly chosen datanode
* that are not in current result set.
*/
private DatanodeID chooseNode(final Set<DatanodeID> results,
final List<DatanodeID> healthyNodes) {
DatanodeID selectedNode = null;
int retry = 0;
while (selectedNode == null && retry < maxRetry) {
DatanodeID firstNode = healthyNodes.get(
abs(rand.nextInt() % healthyNodes.size()));
DatanodeID secondNode = healthyNodes.get(
abs(rand.nextInt() % healthyNodes.size()));
// Randomly pick one from two candidates.
selectedNode = rand.nextBoolean() ? firstNode : secondNode;
if (results.contains(selectedNode)) {
selectedNode = null;
} else {
break;
}
retry++;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Find {} after {} retries!", (selectedNode != null) ?
selectedNode : "no datanode", retry);
}
return selectedNode;
}
}

View File

@ -0,0 +1,192 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.container.placement.algorithms;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
/**
* SCM CommonPolicy implements a set of invariants which are common
* for all container placement policies, acts as the repository of helper
* functions which are common to placement policies.
*/
public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMCommonPolicy.class);
private final NodeManager nodeManager;
private final Random rand;
private final Configuration conf;
/**
* Constructs SCM Common Policy Class.
*
* @param nodeManager NodeManager
* @param conf Configuration class.
*/
public SCMCommonPolicy(NodeManager nodeManager, Configuration conf) {
this.nodeManager = nodeManager;
this.rand = new Random();
this.conf = conf;
}
/**
* Return node manager.
*
* @return node manager
*/
public NodeManager getNodeManager() {
return nodeManager;
}
/**
* Returns the Random Object.
*
* @return rand
*/
public Random getRand() {
return rand;
}
/**
* Get Config.
*
* @return Configuration
*/
public Configuration getConf() {
return conf;
}
/**
* Given the replication factor and size required, return set of datanodes
* that satisfy the nodes and size requirement.
* <p>
* Here are some invariants of container placement.
* <p>
* 1. We place containers only on healthy nodes.
* 2. We place containers on nodes with enough space for that container.
* 3. if a set of containers are requested, we either meet the required
* number of nodes or we fail that request.
*
* @param nodesRequired - number of datanodes required.
* @param sizeRequired - size required for the container or block.
* @return list of datanodes chosen.
* @throws SCMException SCM exception.
*/
public List<DatanodeID> chooseDatanodes(int nodesRequired, final long
sizeRequired) throws SCMException {
List<DatanodeID> healthyNodes =
nodeManager.getNodes(NodeManager.NODESTATE.HEALTHY);
String msg;
if (healthyNodes.size() == 0) {
msg = "No healthy node found to allocate container.";
LOG.error(msg);
throw new SCMException(msg, SCMException.ResultCodes
.FAILED_TO_FIND_HEALTHY_NODES);
}
if (healthyNodes.size() < nodesRequired) {
msg = String.format("Not enough healthy nodes to allocate container. %d "
+ " datanodes required. Found %d",
nodesRequired, healthyNodes.size());
LOG.error(msg);
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
List<DatanodeID> healthyList = healthyNodes.stream().filter(d ->
hasEnoughSpace(d, sizeRequired)).collect(Collectors.toList());
if (healthyList.size() < nodesRequired) {
msg = String.format("Unable to find enough nodes that meet the space " +
"requirement in healthy node set. Nodes required: %d Found: %d",
nodesRequired, healthyList.size());
LOG.error(msg);
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_NODES_WITH_SPACE);
}
return healthyList;
}
/**
* Returns true if this node has enough space to meet our requirement.
*
* @param datanodeID DatanodeID
* @return true if we have enough space.
*/
private boolean hasEnoughSpace(DatanodeID datanodeID, long sizeRequired) {
SCMNodeMetric nodeMetric = nodeManager.getNodeStat(datanodeID);
return (nodeMetric != null) && nodeMetric.get().getRemaining()
.hasResources(sizeRequired);
}
/**
* This function invokes the derived classes chooseNode Function to build a
* list of nodes. Then it verifies that invoked policy was able to return
* expected number of nodes.
*
* @param nodesRequired - Nodes Required
* @param healthyNodes - List of Nodes in the result set.
* @return List of Datanodes that can be used for placement.
* @throws SCMException
*/
public List<DatanodeID> getResultSet(int nodesRequired, List<DatanodeID>
healthyNodes) throws SCMException {
List<DatanodeID> results = new LinkedList<>();
for (int x = 0; x < nodesRequired; x++) {
// invoke the choose function defined in the derived classes.
DatanodeID nodeId = chooseNode(healthyNodes);
if (nodeId != null) {
results.add(nodeId);
}
}
if (results.size() < nodesRequired) {
LOG.error("Unable to find the required number of healthy nodes that " +
"meet the criteria. Required nodes: {}, Found nodes: {}",
nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
return results;
}
/**
* Choose a datanode according to the policy, this function is implemented
* by the actual policy class. For example, PlacementCapacity or
* PlacementRandom.
*
* @param healthyNodes - Set of healthy nodes we can choose from.
* @return DatanodeID
*/
public abstract DatanodeID chooseNode(List<DatanodeID> healthyNodes);
}

View File

@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.container.placement.algorithms;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* Container placement policy that randomly choose datanodes with remaining
* space to satisfy the size constraints.
* <p>
* The Algorithm is as follows, Pick 2 random nodes from a given pool of nodes
* and then pick the node which lower utilization. This leads to a higher
* probability of nodes with lower utilization to be picked.
* <p>
* For those wondering why we choose two nodes randomly and choose the node
* with lower utilization. There are links to this original papers in
* HDFS-11564.
* <p>
* A brief summary -- We treat the nodes from a scale of lowest utilized to
* highest utilized, there are (s * ( s + 1)) / 2 possibilities to build
* distinct pairs of nodes. There are s - k pairs of nodes in which the rank
* k node is less than the couple. So probability of a picking a node is
* (2 * (s -k)) / (s * (s - 1)).
* <p>
* In English, There is a much higher probability of picking less utilized nodes
* as compared to nodes with higher utilization since we pick 2 nodes and
* then pick the node with lower utilization.
* <p>
* This avoids the issue of users adding new nodes into the cluster and HDFS
* sending all traffic to those nodes if we only use a capacity based
* allocation scheme. Unless those nodes are part of the set of the first 2
* nodes then newer nodes will not be in the running to get the container.
* <p>
* This leads to an I/O pattern where the lower utilized nodes are favoured
* more than higher utilized nodes, but part of the I/O will still go to the
* older higher utilized nodes.
* <p>
* With this algorithm in place, our hope is that balancer tool needs to do
* little or no work and the cluster will achieve a balanced distribution
* over time.
*/
public final class SCMContainerPlacementCapacity extends SCMCommonPolicy {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMContainerPlacementCapacity.class);
/**
* Constructs a Container Placement with considering only capacity.
* That is this policy tries to place containers based on node weight.
*
* @param nodeManager Node Manager
* @param conf Configuration
*/
public SCMContainerPlacementCapacity(final NodeManager nodeManager,
final Configuration conf) {
super(nodeManager, conf);
}
/**
* Called by SCM to choose datanodes.
*
* @param nodesRequired - number of datanodes required.
* @param sizeRequired - size required for the container or block.
* @return List of datanodes.
* @throws SCMException SCMException
*/
@Override
public List<DatanodeID> chooseDatanodes(final int nodesRequired,
final long sizeRequired) throws SCMException {
List<DatanodeID> healthyNodes =
super.chooseDatanodes(nodesRequired, sizeRequired);
if (healthyNodes.size() == nodesRequired) {
return healthyNodes;
}
return getResultSet(nodesRequired, healthyNodes);
}
/**
* Find a node from the healthy list and return it after removing it from the
* list that we are operating on.
*
* @param healthyNodes - List of healthy nodes that meet the size
* requirement.
* @return DatanodeID that is chosen.
*/
@Override
public DatanodeID chooseNode(List<DatanodeID> healthyNodes) {
int firstNodeNdx = getRand().nextInt(healthyNodes.size());
int secondNodeNdx = getRand().nextInt(healthyNodes.size());
// There is a possibility that both numbers will be same.
// if that is so, we just return the node.
if (firstNodeNdx == secondNodeNdx) {
return healthyNodes.get(firstNodeNdx);
}
DatanodeID firstNodeID = healthyNodes.get(firstNodeNdx);
DatanodeID secondNodeID = healthyNodes.get(secondNodeNdx);
SCMNodeMetric firstNodeMetric = getNodeManager().getNodeStat(firstNodeID);
SCMNodeMetric secondNodeMetric = getNodeManager().getNodeStat(secondNodeID);
DatanodeID chosenID = firstNodeMetric.isGreater(secondNodeMetric.get())
? firstNodeID : secondNodeID;
healthyNodes.remove(chosenID);
return chosenID;
}
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.container.placement.algorithms;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* Container placement policy that randomly chooses healthy datanodes.
* This is very similar to current HDFS placement. That is we
* just randomly place containers without any considerations of utilization.
* <p>
* That means we rely on balancer to achieve even distribution of data.
* Balancer will need to support containers as a feature before this class
* can be practically used.
*/
public final class SCMContainerPlacementRandom extends SCMCommonPolicy
implements ContainerPlacementPolicy {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMContainerPlacementRandom.class);
/**
* Construct a random Block Placement policy.
*
* @param nodeManager nodeManager
* @param conf Config
*/
public SCMContainerPlacementRandom(final NodeManager nodeManager,
final Configuration conf) {
super(nodeManager, conf);
}
/**
* Choose datanodes called by the SCM to choose the datanode.
*
* @param nodesRequired - number of datanodes required.
* @param sizeRequired - size required for the container or block.
* @return List of Datanodes.
* @throws SCMException SCMException
*/
@Override
public List<DatanodeID> chooseDatanodes(final int nodesRequired,
final long sizeRequired) throws SCMException {
List<DatanodeID> healthyNodes =
super.chooseDatanodes(nodesRequired, sizeRequired);
if (healthyNodes.size() == nodesRequired) {
return healthyNodes;
}
return getResultSet(nodesRequired, healthyNodes);
}
/**
* Just chose a node randomly and remove it from the set of nodes we can
* chose from.
*
* @param healthyNodes - all healthy datanodes.
* @return one randomly chosen datanode that from two randomly chosen datanode
*/
public DatanodeID chooseNode(final List<DatanodeID> healthyNodes) {
DatanodeID selectedNode =
healthyNodes.get(getRand().nextInt(healthyNodes.size()));
healthyNodes.remove(selectedNode);
return selectedNode;
}
}

View File

@ -0,0 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.container.placement.algorithms;
// Various placement algorithms.

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.container.placement.metrics;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
/**
* DatanodeMetric acts as the basis for all the metric that is used in
* comparing 2 datanodes.
*/
public interface DatanodeMetric<T, S> extends Comparable<T> {
/**
* Some syntactic sugar over Comparable interface. This makes code easier to
* read.
*
* @param o - Other Object
* @return - True if *this* object is greater than argument.
*/
boolean isGreater(T o);
/**
* Inverse of isGreater.
*
* @param o - other object.
* @return True if *this* object is Lesser than argument.
*/
boolean isLess(T o);
/**
* Returns true if the object has same values. Because of issues with
* equals, and loss of type information this interface supports isEqual.
*
* @param o object to compare.
* @return True, if the values match.
*/
boolean isEqual(T o);
/**
* A resourceCheck, defined by resourceNeeded.
* For example, S could be bytes required
* and DatanodeMetric can reply by saying it can be met or not.
*
* @param resourceNeeded - ResourceNeeded in its own metric.
* @return boolean, True if this resource requirement can be met.
*/
boolean hasResources(S resourceNeeded) throws SCMException;
/**
* Returns the metric.
*
* @return T, the object that represents this metric.
*/
T get();
/**
* Sets the value of this metric.
*
* @param value - value of the metric.
*/
void set(T value);
/**
* Adds a value of to the base.
* @param value - value
*/
void add(T value);
/**
* subtract a value.
* @param value value
*/
void subtract(T value);
}

View File

@ -0,0 +1,158 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.container.placement.metrics;
/**
* An helper class for all metrics based on Longs.
*/
public class LongMetric implements DatanodeMetric<Long, Long> {
private Long value;
/**
* Constructs a long Metric.
*
* @param value Value for this metric.
*/
public LongMetric(Long value) {
this.value = value;
}
/**
* Some syntactic sugar over Comparable interface. This makes code easier to
* read.
*
* @param o - Other Object
* @return - True if *this* object is greater than argument.
*/
@Override
public boolean isGreater(Long o) {
return compareTo(o) > 0;
}
/**
* Inverse of isGreater.
*
* @param o - other object.
* @return True if *this* object is Lesser than argument.
*/
@Override
public boolean isLess(Long o) {
return compareTo(o) < 0;
}
/**
* Returns true if the object has same values. Because of issues with
* equals, and loss of type information this interface supports isEqual.
*
* @param o object to compare.
* @return True, if the values match.
*/
@Override
public boolean isEqual(Long o) {
return compareTo(o) == 0;
}
/**
* A resourceCheck, defined by resourceNeeded.
* For example, S could be bytes required
* and DatanodeMetric can reply by saying it can be met or not.
*
* @param resourceNeeded - ResourceNeeded in its own metric.
* @return boolean, True if this resource requirement can be met.
*/
@Override
public boolean hasResources(Long resourceNeeded) {
return isGreater(resourceNeeded);
}
/**
* Returns the metric.
*
* @return T, the object that represents this metric.
*/
@Override
public Long get() {
return this.value;
}
/**
* Sets the value of this metric.
*
* @param value - value of the metric.
*/
@Override
public void set(Long value) {
this.value = value;
}
/**
* Adds a value of to the base.
*
* @param value - value
*/
@Override
public void add(Long value) {
this.value += value;
}
/**
* subtract a value.
*
* @param value value
*/
@Override
public void subtract(Long value) {
this.value -= value;
}
/**
* Compares this object with the specified object for order. Returns a
* negative integer, zero, or a positive integer as this object is less
* than, equal to, or greater than the specified object.
*
* @param o the object to be compared.
* @return a negative integer, zero, or a positive integer as this object is
* less than, equal to, or greater than the specified object.
* @throws NullPointerException if the specified object is null
* @throws ClassCastException if the specified object's type prevents it
* from being compared to this object.
*/
@Override
public int compareTo(Long o) {
return Long.compare(this.value, o);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LongMetric that = (LongMetric) o;
return value != null ? value.equals(that.value) : that.value == null;
}
@Override
public int hashCode() {
return value != null ? value.hashCode() : 0;
}
}

View File

@ -0,0 +1,223 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.container.placement.metrics;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* SCM Node Metric that is used in the placement classes.
*/
public class SCMNodeMetric implements DatanodeMetric<SCMNodeStat, Long> {
private SCMNodeStat stat;
/**
* Constructs an SCMNode Metric.
*
* @param stat - SCMNodeStat.
*/
public SCMNodeMetric(SCMNodeStat stat) {
this.stat = stat;
}
/**
* Set the capacity, used and remaining space on a datanode.
*
* @param capacity in bytes
* @param used in bytes
* @param remaining in bytes
*/
@VisibleForTesting
public SCMNodeMetric(long capacity, long used, long remaining) {
this.stat = new SCMNodeStat();
this.stat.set(capacity, used, remaining);
}
/**
*
* @param o - Other Object
* @return - True if *this* object is greater than argument.
*/
@Override
public boolean isGreater(SCMNodeStat o) {
Preconditions.checkNotNull(o, "Argument cannot be null");
// if zero, replace with 1 for the division to work.
long thisDenominator = (this.stat.getCapacity().get() == 0)
? 1 : this.stat.getCapacity().get();
long otherDenominator = (o.getCapacity().get() == 0)
? 1 : o.getCapacity().get();
float thisNodeWeight =
stat.getScmUsed().get() / (float) thisDenominator;
float oNodeWeight =
o.getScmUsed().get() / (float) otherDenominator;
if (Math.abs(thisNodeWeight - oNodeWeight) > 0.000001) {
return thisNodeWeight > oNodeWeight;
}
// if these nodes are have similar weight then return the node with more
// free space as the greater node.
return stat.getRemaining().isGreater(o.getRemaining().get());
}
/**
* Inverse of isGreater.
*
* @param o - other object.
* @return True if *this* object is Lesser than argument.
*/
@Override
public boolean isLess(SCMNodeStat o) {
Preconditions.checkNotNull(o, "Argument cannot be null");
// if zero, replace with 1 for the division to work.
long thisDenominator = (this.stat.getCapacity().get() == 0)
? 1 : this.stat.getCapacity().get();
long otherDenominator = (o.getCapacity().get() == 0)
? 1 : o.getCapacity().get();
float thisNodeWeight =
stat.getScmUsed().get() / (float) thisDenominator;
float oNodeWeight =
o.getScmUsed().get() / (float) otherDenominator;
if (Math.abs(thisNodeWeight - oNodeWeight) > 0.000001) {
return thisNodeWeight < oNodeWeight;
}
// if these nodes are have similar weight then return the node with less
// free space as the lesser node.
return stat.getRemaining().isLess(o.getRemaining().get());
}
/**
* Returns true if the object has same values. Because of issues with
* equals, and loss of type information this interface supports isEqual.
*
* @param o object to compare.
* @return True, if the values match.
* TODO : Consider if it makes sense to add remaining to this equation.
*/
@Override
public boolean isEqual(SCMNodeStat o) {
float thisNodeWeight = stat.getScmUsed().get() / (float)
stat.getCapacity().get();
float oNodeWeight = o.getScmUsed().get() / (float) o.getCapacity().get();
return Math.abs(thisNodeWeight - oNodeWeight) < 0.000001;
}
/**
* A resourceCheck, defined by resourceNeeded.
* For example, S could be bytes required
* and DatanodeMetric can reply by saying it can be met or not.
*
* @param resourceNeeded - ResourceNeeded in its own metric.
* @return boolean, True if this resource requirement can be met.
*/
@Override
public boolean hasResources(Long resourceNeeded) {
return false;
}
/**
* Returns the metric.
*
* @return T, the object that represents this metric.
*/
@Override
public SCMNodeStat get() {
return stat;
}
/**
* Sets the value of this metric.
*
* @param value - value of the metric.
*/
@Override
public void set(SCMNodeStat value) {
stat.set(value.getCapacity().get(), value.getScmUsed().get(),
value.getRemaining().get());
}
/**
* Adds a value of to the base.
*
* @param value - value
*/
@Override
public void add(SCMNodeStat value) {
stat.add(value);
}
/**
* subtract a value.
*
* @param value value
*/
@Override
public void subtract(SCMNodeStat value) {
stat.subtract(value);
}
/**
* Compares this object with the specified object for order. Returns a
* negative integer, zero, or a positive integer as this object is less
* than, equal to, or greater than the specified object.
*
* @param o the object to be compared.
* @return a negative integer, zero, or a positive integer as this object is
* less than, equal to, or greater than the specified object.
* @throws NullPointerException if the specified object is null
* @throws ClassCastException if the specified object's type prevents it
* from being compared to this object.
*/
@Override
public int compareTo(SCMNodeStat o) {
if (isEqual(o)) {
return 0;
}
if (isGreater(o)) {
return 1;
} else {
return -1;
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SCMNodeMetric that = (SCMNodeMetric) o;
return stat != null ? stat.equals(that.stat) : that.stat == null;
}
@Override
public int hashCode() {
return stat != null ? stat.hashCode() : 0;
}
}

View File

@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm.container.placement.metrics;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* This class represents the SCM node stat.
*/
public class SCMNodeStat implements NodeStat {
private LongMetric capacity;
private LongMetric scmUsed;
private LongMetric remaining;
public SCMNodeStat() {
this(0L, 0L, 0L);
}
public SCMNodeStat(SCMNodeStat other) {
this(other.capacity.get(), other.scmUsed.get(), other.remaining.get());
}
public SCMNodeStat(long capacity, long used, long remaining) {
Preconditions.checkArgument(capacity >= 0, "Capacity cannot be " +
"negative.");
Preconditions.checkArgument(used >= 0, "used space cannot be " +
"negative.");
Preconditions.checkArgument(remaining >= 0, "remaining cannot be " +
"negative");
this.capacity = new LongMetric(capacity);
this.scmUsed = new LongMetric(used);
this.remaining = new LongMetric(remaining);
}
/**
* @return the total configured capacity of the node.
*/
public LongMetric getCapacity() {
return capacity;
}
/**
* @return the total SCM used space on the node.
*/
public LongMetric getScmUsed() {
return scmUsed;
}
/**
* @return the total remaining space available on the node.
*/
public LongMetric getRemaining() {
return remaining;
}
/**
* Set the capacity, used and remaining space on a datanode.
*
* @param capacity in bytes
* @param used in bytes
* @param remaining in bytes
*/
@VisibleForTesting
public void set(long capacity, long used, long remaining) {
Preconditions.checkNotNull(capacity, "Capacity cannot be null");
Preconditions.checkNotNull(used, "used cannot be null");
Preconditions.checkNotNull(remaining, "remaining cannot be null");
Preconditions.checkArgument(capacity >= 0, "Capacity cannot be " +
"negative.");
Preconditions.checkArgument(used >= 0, "used space cannot be " +
"negative.");
Preconditions.checkArgument(remaining >= 0, "remaining cannot be " +
"negative");
this.capacity = new LongMetric(capacity);
this.scmUsed = new LongMetric(used);
this.remaining = new LongMetric(remaining);
}
/**
* Adds a new nodestat to existing values of the node.
*
* @param stat Nodestat.
* @return SCMNodeStat
*/
public SCMNodeStat add(NodeStat stat) {
this.capacity.set(this.getCapacity().get() + stat.getCapacity().get());
this.scmUsed.set(this.getScmUsed().get() + stat.getScmUsed().get());
this.remaining.set(this.getRemaining().get() + stat.getRemaining().get());
return this;
}
/**
* Subtracts the stat values from the existing NodeStat.
*
* @param stat SCMNodeStat.
* @return Modified SCMNodeStat
*/
public SCMNodeStat subtract(NodeStat stat) {
this.capacity.set(this.getCapacity().get() - stat.getCapacity().get());
this.scmUsed.set(this.getScmUsed().get() - stat.getScmUsed().get());
this.remaining.set(this.getRemaining().get() - stat.getRemaining().get());
return this;
}
@Override
public boolean equals(Object to) {
if (to instanceof SCMNodeStat) {
SCMNodeStat tempStat = (SCMNodeStat) to;
return capacity.isEqual(tempStat.getCapacity().get()) &&
scmUsed.isEqual(tempStat.getScmUsed().get()) &&
remaining.isEqual(tempStat.getRemaining().get());
}
return false;
}
@Override
public int hashCode() {
return Long.hashCode(capacity.get() ^ scmUsed.get() ^ remaining.get());
}
}

View File

@ -0,0 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.container.placement.metrics;
// Various metrics supported by Datanode and used by SCM in the placement
// strategy.

View File

@ -0,0 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.container.placement;
// Classes related to container placement.

View File

@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -16,19 +16,13 @@
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm.exceptions;
import java.io.IOException;
/**
* Exception thrown by SCM.
*/
public class SCMException extends IOException {
/**
* Error codes to make it easy to decode these exceptions.
*/
public enum ResultCodes {
FAILED_TO_LOAD_NODEPOOL,
NODE_NOT_FOUND_IN_NODEPOOL,
}
private final ResultCodes result;
/**
@ -88,4 +82,24 @@ public SCMException(Throwable cause, ResultCodes result) {
super(cause);
this.result = result;
}
/**
* Returns resultCode.
* @return ResultCode
*/
public ResultCodes getResult() {
return result;
}
/**
* Error codes to make it easy to decode these exceptions.
*/
public enum ResultCodes {
FAILED_TO_LOAD_NODEPOOL,
FAILED_TO_FIND_NODE_IN_POOL,
FAILED_TO_FIND_HEALTHY_NODES,
FAILED_TO_FIND_NODES_WITH_SPACE,
FAILED_TO_FIND_SUITABLE_NODE,
INVALID_CAPACITY
}
}

View File

@ -21,6 +21,8 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
import java.io.Closeable;
import java.util.List;
@ -127,7 +129,7 @@ enum NODESTATE {
* @param datanodeID - datanode ID.
* @return node stat if it is live/stale, null if it is dead or does't exist.
*/
SCMNodeStat getNodeStat(DatanodeID datanodeID);
SCMNodeMetric getNodeStat(DatanodeID datanodeID);
/**
* Wait for the heartbeat is processed by NodeManager.

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm.node;
package org.apache.hadoop.ozone.scm.container.placement.metrics;
import com.google.common.annotations.VisibleForTesting;
@ -28,28 +28,28 @@ interface NodeStat {
* Get capacity of the node.
* @return capacity of the node.
*/
long getCapacity();
LongMetric getCapacity();
/**
* Get the used space of the node.
* @return the used space of the node.
*/
long getScmUsed();
LongMetric getScmUsed();
/**
* Get the remaining space of the node.
* @return the remaining space of the node.
*/
long getRemaining();
LongMetric getRemaining();
/**
* Set the total/used/remaining space.
* @param total - total space.
* @param capacity - total space.
* @param used - used space.
* @param remain - remaining space.
*/
@VisibleForTesting
void set(long total, long used, long remain);
void set(long capacity, long used, long remain);
/**
* Adding of the stat.

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.ozone.scm.node;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import java.util.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.collections.map.HashedMap;
@ -42,6 +42,8 @@
.proto.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.ozone.scm.VersionInfo;
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.slf4j.Logger;
@ -165,7 +167,7 @@ public SCMNodeManager(OzoneConfiguration conf, String clusterID)
executorService = HadoopExecutors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("SCM Heartbeat Processing Thread - %d").build());
this.inManualChillMode = Optional.absent();
this.inManualChillMode = Optional.empty();
Preconditions.checkState(heartbeatCheckerIntervalMs > 0);
executorService.schedule(this, heartbeatCheckerIntervalMs,
@ -290,7 +292,7 @@ public boolean isOutOfNodeChillMode() {
*/
@Override
public void clearChillModeFlag() {
this.inManualChillMode = Optional.absent();
this.inManualChillMode = Optional.empty();
}
/**
@ -768,8 +770,8 @@ public Map<String, SCMNodeStat> getNodeStats() {
* @return node stat if it is live/stale, null if it is dead or does't exist.
*/
@Override
public SCMNodeStat getNodeStat(DatanodeID datanodeID) {
return nodeStats.get(datanodeID.getDatanodeUuid());
public SCMNodeMetric getNodeStat(DatanodeID datanodeID) {
return new SCMNodeMetric(nodeStats.get(datanodeID.getDatanodeUuid()));
}
@Override

View File

@ -46,7 +46,7 @@
import static org.apache.hadoop.ozone.scm
.exceptions.SCMException.ResultCodes.FAILED_TO_LOAD_NODEPOOL;
import static org.apache.hadoop.ozone.scm
.exceptions.SCMException.ResultCodes.NODE_NOT_FOUND_IN_NODEPOOL;
.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_NODE_IN_POOL;
import static org.apache.hadoop.scm
.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
import static org.apache.hadoop.scm
@ -183,7 +183,7 @@ public void removeNode(final String pool, final DatanodeID node)
if (kData == null) {
throw new SCMException(String.format("Unable to find node %s from" +
" pool %s in DB.", DFSUtil.bytes2String(kName), pool),
NODE_NOT_FOUND_IN_NODEPOOL);
FAILED_TO_FIND_NODE_IN_POOL);
}
nodePoolStore.delete(kName);
@ -194,7 +194,7 @@ public void removeNode(final String pool, final DatanodeID node)
} else {
throw new SCMException(String.format("Unable to find node %s from" +
" pool %s in MAP.", DFSUtil.bytes2String(kName), pool),
NODE_NOT_FOUND_IN_NODEPOOL); }
FAILED_TO_FIND_NODE_IN_POOL); }
} finally {
lock.writeLock().unlock();
}

View File

@ -1,100 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm.node;
import com.google.common.annotations.VisibleForTesting;
/**
* This class represents the SCM node stat.
*/
public final class SCMNodeStat implements NodeStat {
private long capacity;
private long scmUsed;
private long remaining;
public SCMNodeStat() {
}
public SCMNodeStat(final SCMNodeStat other) {
set(other.capacity, other.scmUsed, other.remaining);
}
/**
* @return the total configured capacity of the node.
*/
@Override
public long getCapacity() {
return capacity;
}
/**
* @return the total SCM used space on the node.
*/
@Override
public long getScmUsed() {
return scmUsed;
}
/**
* @return the total remaining space available on the node.
*/
@Override
public long getRemaining() {
return remaining;
}
@VisibleForTesting
@Override
public void set(final long total, final long used, final long remain) {
this.capacity = total;
this.scmUsed = used;
this.remaining = remain;
}
@Override
public SCMNodeStat add(final NodeStat stat) {
this.capacity += stat.getCapacity();
this.scmUsed += stat.getScmUsed();
this.remaining += stat.getRemaining();
return this;
}
@Override
public SCMNodeStat subtract(final NodeStat stat) {
this.capacity -= stat.getCapacity();
this.scmUsed -= stat.getScmUsed();
this.remaining -= stat.getRemaining();
return this;
}
@Override
public boolean equals(final Object to) {
return this == to
|| (to instanceof SCMNodeStat
&& capacity == ((SCMNodeStat) to).getCapacity()
&& scmUsed == ((SCMNodeStat) to).getScmUsed()
&& remaining == ((SCMNodeStat) to).getRemaining());
}
@Override
public int hashCode() {
assert false : "hashCode not designed";
return 42; // any arbitrary constant will do
}
}

View File

@ -17,9 +17,8 @@
*/
package org.apache.hadoop.ozone;
import com.google.common.base.Optional;
import java.util.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -191,9 +190,7 @@ public OzoneClient createOzoneClient() throws OzoneException {
* Waits for the Ozone cluster to be ready for processing requests.
*/
public void waitOzoneReady() throws TimeoutException, InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
GenericTestUtils.waitFor(() -> {
if (scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY)
>= numDataNodes) {
return true;
@ -203,7 +200,6 @@ public Boolean get() {
numDataNodes);
return false;
}
}, 1000, 5 * 60 * 1000); //wait for 5 mins.
}
@ -216,15 +212,12 @@ public Boolean get() {
*/
public void waitTobeOutOfChillMode() throws TimeoutException,
InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
GenericTestUtils.waitFor(() -> {
if (scm.getScmNodeManager().isOutOfNodeChillMode()) {
return true;
}
LOG.info("Waiting for cluster to be ready. No datanodes found");
return false;
}
}, 100, 45000);
}
@ -234,7 +227,7 @@ public void waitForHeartbeatProcessed() throws TimeoutException,
scm.getScmNodeManager().waitForHeartbeatProcessed(), 100,
4 * 1000);
GenericTestUtils.waitFor(() ->
scm.getScmNodeManager().getStats().getCapacity() > 0, 100,
scm.getScmNodeManager().getStats().getCapacity().get() > 0, 100,
4 * 1000);
}
@ -242,21 +235,20 @@ public void waitForHeartbeatProcessed() throws TimeoutException,
* Builder for configuring the MiniOzoneCluster to run.
*/
public static class Builder
extends org.apache.hadoop.hdfs.MiniDFSCluster.Builder {
extends MiniDFSCluster.Builder {
private final OzoneConfiguration conf;
private final int defaultHBSeconds = 1;
private final int defaultProcessorMs = 100;
private static final int DEFAULT_HB_SECONDS = 1;
private static final int DEFAULT_PROCESSOR_MS = 100;
private final String path;
private final UUID runID;
private Optional<String> ozoneHandlerType = Optional.absent();
private Optional<String> ozoneHandlerType = java.util.Optional.empty();
private Optional<Boolean> enableTrace = Optional.of(false);
private Optional<Integer> hbSeconds = Optional.absent();
private Optional<Integer> hbProcessorInterval = Optional.absent();
private Optional<String> scmMetadataDir = Optional.absent();
private Optional<Integer> hbSeconds = Optional.empty();
private Optional<Integer> hbProcessorInterval = Optional.empty();
private Optional<String> scmMetadataDir = Optional.empty();
private Boolean ozoneEnabled = true;
private Boolean waitForChillModeFinish = true;
private int containerWorkerThreadInterval = 1;
private Boolean randomContainerPort = true;
/**
@ -268,9 +260,6 @@ public Builder(OzoneConfiguration conf) {
super(conf);
this.conf = conf;
// TODO : Remove this later, with SCM, NN and SCM can run together.
//this.nnTopology(new MiniDFSNNTopology()); // No NameNode required
URL p = conf.getClass().getResource("");
path = p.getPath().concat(MiniOzoneCluster.class.getSimpleName() + UUID
.randomUUID().toString());
@ -329,11 +318,6 @@ public Builder doNotwaitTobeOutofChillMode() {
return this;
}
public Builder setSCMContainerWorkerThreadInterval(int intervalInSeconds) {
containerWorkerThreadInterval = intervalInSeconds;
return this;
}
public String getPath() {
return path;
}
@ -391,7 +375,7 @@ private void configScmMetadata() throws IOException {
return;
}
// If user has not specified a path, create a UUID for this miniCluser
// If user has not specified a path, create a UUID for this miniCluster
// and create SCM under that directory.
Path scmPath = Paths.get(path, runID.toString(), "scm");
Files.createDirectories(scmPath);
@ -417,10 +401,12 @@ private void configureTrace() {
if (enableTrace.isPresent()) {
conf.setBoolean(OzoneConfigKeys.OZONE_TRACE_ENABLED_KEY,
enableTrace.get());
}
GenericTestUtils.setLogLevel(org.apache.log4j.Logger.getRootLogger(),
Level.ALL);
}
GenericTestUtils.setLogLevel(org.apache.log4j.Logger.getRootLogger(),
Level.INFO);
}
private void configureSCMheartbeat() {
if (hbSeconds.isPresent()) {
@ -429,7 +415,7 @@ private void configureSCMheartbeat() {
} else {
conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS,
defaultHBSeconds);
DEFAULT_HB_SECONDS);
}
if (hbProcessorInterval.isPresent()) {
@ -437,7 +423,7 @@ private void configureSCMheartbeat() {
hbProcessorInterval.get());
} else {
conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS,
defaultProcessorMs);
DEFAULT_PROCESSOR_MS);
}
}

View File

@ -19,8 +19,8 @@
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.scm.container.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.container.SCMContainerPlacementCapacity;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.client.ContainerOperationClient;
@ -41,7 +41,7 @@
public class TestContainerOperations {
private static ScmClient storageClient;
private static MiniOzoneCluster cluster;;
private static MiniOzoneCluster cluster;
private static OzoneConfiguration ozoneConf;
@BeforeClass

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.placement;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.scm.container.MockNodeManager;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementRandom;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.Random;
import static org.junit.Assert.assertEquals;
/**
* Asserts that allocation strategy works as expected.
*/
public class TestContainerPlacement {
private DescriptiveStatistics computeStatistics(NodeManager nodeManager) {
DescriptiveStatistics descriptiveStatistics = new DescriptiveStatistics();
for (DatanodeID id : nodeManager.getNodes(NodeManager.NODESTATE.HEALTHY)) {
float weightedValue =
nodeManager.getNodeStat(id).get().getScmUsed().get() / (float)
nodeManager.getNodeStat(id).get().getCapacity().get();
descriptiveStatistics.addValue(weightedValue);
}
return descriptiveStatistics;
}
/**
* This test simulates lots of Cluster I/O and updates the metadata in SCM.
* We simulate adding and removing containers from the cluster. It asserts
* that our placement algorithm has taken the capacity of nodes into
* consideration by asserting that standard deviation of used space on these
* has improved.
*/
@Test
public void testCapacityPlacementYieldsBetterDataDistribution() throws
SCMException {
final int opsCount = 200 * 1000;
final int nodesRequired = 3;
Random random = new Random();
// The nature of init code in MockNodeManager yields similar clusters.
MockNodeManager nodeManagerCapacity = new MockNodeManager(true, 100);
MockNodeManager nodeManagerRandom = new MockNodeManager(true, 100);
DescriptiveStatistics beforeCapacity =
computeStatistics(nodeManagerCapacity);
DescriptiveStatistics beforeRandom = computeStatistics(nodeManagerRandom);
//Assert that our initial layout of clusters are similar.
assertEquals(beforeCapacity.getStandardDeviation(), beforeRandom
.getStandardDeviation(), 0.001);
SCMContainerPlacementCapacity capacityPlacer = new
SCMContainerPlacementCapacity(nodeManagerCapacity, new Configuration());
SCMContainerPlacementRandom randomPlacer = new
SCMContainerPlacementRandom(nodeManagerRandom, new Configuration());
for (int x = 0; x < opsCount; x++) {
long containerSize = random.nextInt(100) * OzoneConsts.GB;
List<DatanodeID> nodesCapacity =
capacityPlacer.chooseDatanodes(nodesRequired, containerSize);
assertEquals(nodesRequired, nodesCapacity.size());
List<DatanodeID> nodesRandom = randomPlacer.chooseDatanodes(nodesRequired,
containerSize);
// One fifth of all calls are delete
if (x % 5 == 0) {
deleteContainer(nodeManagerCapacity, nodesCapacity, containerSize);
deleteContainer(nodeManagerRandom, nodesRandom, containerSize);
} else {
createContainer(nodeManagerCapacity, nodesCapacity, containerSize);
createContainer(nodeManagerRandom, nodesRandom, containerSize);
}
}
DescriptiveStatistics postCapacity = computeStatistics(nodeManagerCapacity);
DescriptiveStatistics postRandom = computeStatistics(nodeManagerRandom);
// This is a very bold claim, and needs large number of I/O operations.
// The claim in this assertion is that we improved the data distribution
// of this cluster in relation to the start state of the cluster.
Assert.assertTrue(beforeCapacity.getStandardDeviation() >
postCapacity.getStandardDeviation());
// This asserts that Capacity placement yields a better placement
// algorithm than random placement, since both cluster started at an
// identical state.
Assert.assertTrue(postRandom.getStandardDeviation() >
postCapacity.getStandardDeviation());
}
private void deleteContainer(MockNodeManager nodeManager,
List<DatanodeID> nodes, long containerSize) {
for (DatanodeID id : nodes) {
nodeManager.delContainer(id, containerSize);
}
}
private void createContainer(MockNodeManager nodeManager,
List<DatanodeID> nodes, long containerSize) {
for (DatanodeID id : nodes) {
nodeManager.addContainer(id, containerSize);
}
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.placement;
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Tests that test Metrics that support placement.
*/
public class TestDatanodeMetrics {
@Rule
public ExpectedException exception = ExpectedException.none();
@Test
public void testSCMNodeMetric() {
SCMNodeStat stat = new SCMNodeStat(100L, 10L, 90L);
assertEquals((long) stat.getCapacity().get(), 100L);
assertEquals((long) stat.getScmUsed().get(), 10L);
assertEquals((long) stat.getRemaining().get(), 90L);
SCMNodeMetric metric = new SCMNodeMetric(stat);
SCMNodeStat newStat = new SCMNodeStat(100L, 10L, 90L);
assertEquals((long) stat.getCapacity().get(), 100L);
assertEquals((long) stat.getScmUsed().get(), 10L);
assertEquals((long) stat.getRemaining().get(), 90L);
SCMNodeMetric newMetric = new SCMNodeMetric(newStat);
assertTrue(metric.isEqual(newMetric.get()));
newMetric.add(stat);
assertTrue(newMetric.isGreater(metric.get()));
SCMNodeMetric zeroMetric = new SCMNodeMetric(new SCMNodeStat());
// Assert we can handle zero capacity.
assertTrue(metric.isGreater(zeroMetric.get()));
}
}

View File

@ -22,8 +22,8 @@
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.scm.container.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.container.SCMContainerPlacementCapacity;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;

View File

@ -18,6 +18,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
@ -25,9 +26,9 @@
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.node.SCMNodeStat;
import java.io.IOException;
import java.util.HashMap;
@ -39,18 +40,52 @@
* Test Helper for testing container Mapping.
*/
public class MockNodeManager implements NodeManager {
private final List<DatanodeID> healthyNodes;
private static final int HEALTHY_NODE_COUNT = 10;
private final static NodeData[] NODES = {
new NodeData(10L * OzoneConsts.TB, OzoneConsts.GB),
new NodeData(64L * OzoneConsts.TB, 100 * OzoneConsts.GB),
new NodeData(128L * OzoneConsts.TB, 256 * OzoneConsts.GB),
new NodeData(40L * OzoneConsts.TB, OzoneConsts.TB),
new NodeData(256L * OzoneConsts.TB, 200 * OzoneConsts.TB),
new NodeData(20L * OzoneConsts.TB, 10 * OzoneConsts.GB),
new NodeData(32L * OzoneConsts.TB, 16 * OzoneConsts.TB),
new NodeData(OzoneConsts.TB, 900 * OzoneConsts.GB),
};
private final List<DatanodeID> healthyNodes;
private final Map<String, SCMNodeStat> nodeMetricMap;
private final SCMNodeStat aggregateStat;
private boolean chillmode;
public MockNodeManager() {
public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
this.healthyNodes = new LinkedList<>();
for (int x = 0; x < 10; x++) {
healthyNodes.add(SCMTestUtils.getDatanodeID());
this.nodeMetricMap = new HashMap<>();
aggregateStat = new SCMNodeStat();
if (initializeFakeNodes) {
for (int x = 0; x < nodeCount; x++) {
DatanodeID id = SCMTestUtils.getDatanodeID();
healthyNodes.add(id);
populateNodeMetric(id, x);
}
}
chillmode = false;
}
/**
* Invoked from ctor to create some node Metrics.
*
* @param datanodeID - Datanode ID
*/
private void populateNodeMetric(DatanodeID datanodeID, int x) {
SCMNodeStat newStat = new SCMNodeStat();
long remaining =
NODES[x % NODES.length].capacity - NODES[x % NODES.length].used;
newStat.set(
(NODES[x % NODES.length].capacity),
(NODES[x % NODES.length].used), remaining);
this.nodeMetricMap.put(datanodeID.toString(), newStat);
aggregateStat.add(newStat);
}
/**
* Sets the chill mode value.
* @param chillmode boolean
@ -184,7 +219,7 @@ public boolean isInManualChillMode() {
*/
@Override
public SCMNodeStat getStats() {
return null;
return aggregateStat;
}
/**
@ -193,7 +228,7 @@ public SCMNodeStat getStats() {
*/
@Override
public Map<String, SCMNodeStat> getNodeStats() {
return null;
return nodeMetricMap;
}
/**
@ -202,8 +237,8 @@ public Map<String, SCMNodeStat> getNodeStats() {
* @return node stat if it is live/stale, null if it is dead or does't exist.
*/
@Override
public SCMNodeStat getNodeStat(DatanodeID datanodeID) {
return null;
public SCMNodeMetric getNodeStat(DatanodeID datanodeID) {
return new SCMNodeMetric(nodeMetricMap.get(datanodeID.toString()));
}
/**
@ -283,6 +318,27 @@ public SCMCommand register(DatanodeID datanodeID) {
@Override
public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID,
SCMNodeReport nodeReport) {
if ((datanodeID != null) && (nodeReport != null) && (nodeReport
.getStorageReportCount() > 0)) {
SCMNodeStat stat = this.nodeMetricMap.get(datanodeID.toString());
long totalCapacity = 0L;
long totalRemaining = 0L;
long totalScmUsed = 0L;
List<StorageContainerDatanodeProtocolProtos.SCMStorageReport>
storageReports = nodeReport.getStorageReportList();
for (StorageContainerDatanodeProtocolProtos.SCMStorageReport report :
storageReports) {
totalCapacity += report.getCapacity();
totalRemaining +=report.getRemaining();
totalScmUsed += report.getScmUsed();
}
aggregateStat.subtract(stat);
stat.set(totalCapacity, totalScmUsed, totalRemaining);
aggregateStat.add(stat);
nodeMetricMap.put(datanodeID.toString(), stat);
}
return null;
}
@ -294,4 +350,71 @@ public Map<String, Integer> getNodeCount() {
}
return nodeCountMap;
}
/**
* Makes it easy to add a container.
*
* @param datanodeID datanode ID
* @param size number of bytes.
*/
public void addContainer(DatanodeID datanodeID, long size) {
SCMNodeStat stat = this.nodeMetricMap.get(datanodeID.toString());
if (stat != null) {
aggregateStat.subtract(stat);
stat.getCapacity().add(size);
aggregateStat.add(stat);
nodeMetricMap.put(datanodeID.toString(), stat);
}
}
/**
* Makes it easy to simulate a delete of a container.
*
* @param datanodeID datanode ID
* @param size number of bytes.
*/
public void delContainer(DatanodeID datanodeID, long size) {
SCMNodeStat stat = this.nodeMetricMap.get(datanodeID.toString());
if (stat != null) {
aggregateStat.subtract(stat);
stat.getCapacity().subtract(size);
aggregateStat.add(stat);
nodeMetricMap.put(datanodeID.toString(), stat);
}
}
/**
* A class to declare some values for the nodes so that our tests
* won't fail.
*/
private static class NodeData {
private long capacity, used;
/**
* Constructs a nodeDefinition.
*
* @param capacity capacity.
* @param used used.
*/
NodeData(long capacity, long used) {
this.capacity = capacity;
this.used = used;
}
public long getCapacity() {
return capacity;
}
public void setCapacity(long capacity) {
this.capacity = capacity;
}
public long getUsed() {
return used;
}
public void setUsed(long used) {
this.used = used;
}
}
}

View File

@ -59,15 +59,17 @@ public static void setUp() throws Exception {
testDir = Paths.get(path).toFile();
boolean folderExisted = testDir.exists() || testDir.mkdirs();
if (!folderExisted) {
throw new IOException("Unable to create test diectory path");
throw new IOException("Unable to create test directory path");
}
nodeManager = new MockNodeManager();
nodeManager = new MockNodeManager(true, 10);
mapping = new ContainerMapping(conf, nodeManager, 128);
}
@AfterClass
public static void cleanup() throws IOException {
if(mapping != null) {
mapping.close();
}
FileUtil.fullyDelete(testDir);
}

View File

@ -26,10 +26,11 @@
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.scm.container.ContainerMapping;
import org.apache.hadoop.ozone.scm.container.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.container.SCMContainerPlacementCapacity;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@ -93,6 +94,7 @@ ContainerMapping createContainerManager(Configuration config,
return new ContainerMapping(config, scmNodeManager, cacheSize);
}
/**
* Test capacity based container placement policy with node reports.
*
@ -139,11 +141,11 @@ public void testContainerPlacementCapacity() throws IOException,
100, 4 * 1000);
assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
assertEquals(capacity * nodeCount,
nodeManager.getStats().getCapacity());
(long) nodeManager.getStats().getCapacity().get());
assertEquals(used * nodeCount,
nodeManager.getStats().getScmUsed());
(long) nodeManager.getStats().getScmUsed().get());
assertEquals(remaining * nodeCount,
nodeManager.getStats().getRemaining());
(long) nodeManager.getStats().getRemaining().get());
assertTrue(nodeManager.isOutOfNodeChillMode());
@ -168,14 +170,14 @@ public void testContainerPlacementCapacity() throws IOException,
nrb.addStorageReport(srb).build());
}
GenericTestUtils.waitFor(() -> nodeManager.getStats().getRemaining() ==
nodeCount * newRemaining,
GenericTestUtils.waitFor(() -> nodeManager.getStats().getRemaining()
.get() == nodeCount * newRemaining,
100, 4 * 1000);
thrown.expect(IOException.class);
thrown.expectMessage(
startsWith("No healthy node found with enough remaining capacity to" +
" allocate container."));
startsWith("Unable to find enough nodes that meet the space " +
"requirement in healthy node set."));
String container2 = UUID.randomUUID().toString();
containerManager.allocateContainer(container2,
ScmClient.ReplicationFactor.THREE);

View File

@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -26,6 +26,7 @@
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
@ -45,14 +46,14 @@
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.DEAD;
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY;
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.STALE;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_MS;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL_MS;
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY;
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.STALE;
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.DEAD;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.StringStartsWith.startsWith;
@ -635,7 +636,8 @@ private void heartbeatNodeSet(SCMNodeManager manager, List<DatanodeID> list,
while (!Thread.currentThread().isInterrupted()) {
for (DatanodeID dn : list) {
manager.sendHeartbeat(dn, null);
} Thread.sleep(sleepDuration);
}
Thread.sleep(sleepDuration);
}
}
@ -690,7 +692,6 @@ public void testScmClusterIsInExpectedState2() throws IOException,
conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 7000);
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
List<DatanodeID> healthyNodeList = createNodeSet(nodeManager,
healthyCount, "Healthy");
@ -952,12 +953,12 @@ public void testScmStatsFromNodeReport() throws IOException,
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
100, 4 * 1000);
assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
assertEquals(capacity * nodeCount,
nodeManager.getStats().getCapacity());
assertEquals(used * nodeCount,
nodeManager.getStats().getScmUsed());
assertEquals(remaining * nodeCount,
nodeManager.getStats().getRemaining());
assertEquals(capacity * nodeCount, (long) nodeManager.getStats()
.getCapacity().get());
assertEquals(used * nodeCount, (long) nodeManager.getStats()
.getScmUsed().get());
assertEquals(remaining * nodeCount, (long) nodeManager.getStats()
.getRemaining().get());
}
}
@ -999,30 +1000,40 @@ public void testScmNodeReportUpdate() throws IOException,
}
final long expectedScmUsed = usedPerHeartbeat * (heartbeatCount - 1);
final long expectedRemaining = capacity -
usedPerHeartbeat * (heartbeatCount - 1);
final long expectedRemaining = capacity - expectedScmUsed;
GenericTestUtils.waitFor(
() -> nodeManager.getStats().getScmUsed() == expectedScmUsed, 100,
4 * 1000);
() -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed,
100, 4 * 1000);
assertEquals(capacity, nodeManager.getStats().getCapacity());
assertEquals(expectedScmUsed, nodeManager.getStats().getScmUsed());
assertEquals(expectedRemaining, nodeManager.getStats().getRemaining());
long foundCapacity = nodeManager.getStats().getCapacity().get();
assertEquals(capacity, foundCapacity);
long foundScmUsed = nodeManager.getStats().getScmUsed().get();
assertEquals(expectedScmUsed, foundScmUsed);
long foundRemaining = nodeManager.getStats().getRemaining().get();
assertEquals(expectedRemaining, foundRemaining);
// Test NodeManager#getNodeStats
assertEquals(nodeCount, nodeManager.getNodeStats().size());
assertEquals(capacity, nodeManager.getNodeStat(datanodeID).getCapacity());
assertEquals(expectedScmUsed,
nodeManager.getNodeStat(datanodeID).getScmUsed());
assertEquals(expectedRemaining,
nodeManager.getNodeStat(datanodeID).getRemaining());
long nodeCapacity = nodeManager.getNodeStat(datanodeID).get()
.getCapacity().get();
assertEquals(capacity, nodeCapacity);
foundScmUsed = nodeManager.getNodeStat(datanodeID).get().getScmUsed()
.get();
assertEquals(expectedScmUsed, foundScmUsed);
foundRemaining = nodeManager.getNodeStat(datanodeID).get()
.getRemaining().get();
assertEquals(expectedRemaining, foundRemaining);
// Compare the result from
// NodeManager#getNodeStats and NodeManager#getNodeStat
SCMNodeStat stat1 = nodeManager.getNodeStats().
get(datanodeID.getDatanodeUuid());
SCMNodeStat stat2 = nodeManager.getNodeStat(datanodeID);
SCMNodeStat stat2 = nodeManager.getNodeStat(datanodeID).get();
assertEquals(stat1, stat2);
// Wait up to 4s so that the node becomes stale
@ -1031,11 +1042,17 @@ public void testScmNodeReportUpdate() throws IOException,
() -> nodeManager.getNodeCount(NodeManager.NODESTATE.STALE) == 1, 100,
4 * 1000);
assertEquals(nodeCount, nodeManager.getNodeStats().size());
assertEquals(capacity, nodeManager.getNodeStat(datanodeID).getCapacity());
assertEquals(expectedScmUsed,
nodeManager.getNodeStat(datanodeID).getScmUsed());
assertEquals(expectedRemaining,
nodeManager.getNodeStat(datanodeID).getRemaining());
foundCapacity = nodeManager.getNodeStat(datanodeID).get()
.getCapacity().get();
assertEquals(capacity, foundCapacity);
foundScmUsed = nodeManager.getNodeStat(datanodeID).get()
.getScmUsed().get();
assertEquals(expectedScmUsed, foundScmUsed);
foundRemaining = nodeManager.getNodeStat(datanodeID).get().
getRemaining().get();
assertEquals(expectedRemaining, foundRemaining);
// Wait up to 4 more seconds so the node becomes dead
// Verify usage info should be updated.
@ -1044,11 +1061,16 @@ public void testScmNodeReportUpdate() throws IOException,
4 * 1000);
assertEquals(0, nodeManager.getNodeStats().size());
assertEquals(0, nodeManager.getStats().getCapacity());
assertEquals(0, nodeManager.getStats().getScmUsed());
assertEquals(0, nodeManager.getStats().getRemaining());
foundCapacity = nodeManager.getStats().getCapacity().get();
assertEquals(0, foundCapacity);
// Send a new report to bring the dead node back to healty
foundScmUsed = nodeManager.getStats().getScmUsed().get();
assertEquals(0, foundScmUsed);
foundRemaining = nodeManager.getStats().getRemaining().get();
assertEquals(0, foundRemaining);
// Send a new report to bring the dead node back to healthy
SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
srb.setStorageUuid(UUID.randomUUID().toString());
@ -1063,14 +1085,18 @@ public void testScmNodeReportUpdate() throws IOException,
() -> nodeManager.getNodeCount(NodeManager.NODESTATE.HEALTHY) == 1,
100, 5 * 1000);
GenericTestUtils.waitFor(
() -> nodeManager.getStats().getScmUsed() == expectedScmUsed, 100,
4 * 1000);
() -> nodeManager.getStats().getScmUsed().get() == expectedScmUsed,
100, 4 * 1000);
assertEquals(nodeCount, nodeManager.getNodeStats().size());
assertEquals(capacity, nodeManager.getNodeStat(datanodeID).getCapacity());
assertEquals(expectedScmUsed,
nodeManager.getNodeStat(datanodeID).getScmUsed());
assertEquals(expectedRemaining,
nodeManager.getNodeStat(datanodeID).getRemaining());
foundCapacity = nodeManager.getNodeStat(datanodeID).get()
.getCapacity().get();
assertEquals(capacity, foundCapacity);
foundScmUsed = nodeManager.getNodeStat(datanodeID).get().getScmUsed()
.get();
assertEquals(expectedScmUsed, foundScmUsed);
foundRemaining = nodeManager.getNodeStat(datanodeID).get()
.getRemaining().get();
assertEquals(expectedRemaining, foundRemaining);
}
}
}

View File

@ -24,8 +24,8 @@
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.scm.container.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.container.SCMContainerPlacementCapacity;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.test.PathUtils;
import org.junit.Rule;