HDFS-11469. Ozone: SCM: Container allocation based on node report. Contributed by Xiaoyu Yao.

This commit is contained in:
Anu Engineer 2017-03-14 11:54:26 -07:00 committed by Owen O'Malley
parent fc7d678d3d
commit 932423211f
40 changed files with 1335 additions and 390 deletions

View File

@ -203,6 +203,8 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ChunkedArrayList;
@ -2773,6 +2775,19 @@ public class PBHelperClient {
return result;
}
public static ContainerRequestProto.ReplicationFactor
convertReplicationFactor(ScmClient.ReplicationFactor replicationFactor) {
switch (replicationFactor) {
case ONE:
return ContainerRequestProto.ReplicationFactor.ONE;
case THREE:
return ContainerRequestProto.ReplicationFactor.THREE;
default:
throw new IllegalArgumentException("Ozone only supports replicaiton" +
" factor 1 or 3");
}
}
public static XAttr convertXAttr(XAttrProto a) {
XAttr.Builder builder = new XAttr.Builder();
builder.setNameSpace(convert(a.getNamespace()));

View File

@ -21,7 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This class contains constants for configuration keys used in SCM
* This class contains constants for configuration keys used in SCM.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
@ -123,4 +123,18 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_DB_CACHE_SIZE_MB =
"ozone.scm.db.cache.size.mb";
public static final int OZONE_SCM_DB_CACHE_SIZE_DEFAULT = 128;
public static final String OZONE_SCM_CONTAINER_SIZE_GB =
"ozone.scm.container.size.gb";
public static final int OZONE_SCM_CONTAINER_SIZE_DEFAULT = 5;
public static final String OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY =
"ozone.scm.container.placement.impl";
/**
* Never constructed.
*/
private ScmConfigKeys() {
}
}

View File

@ -93,6 +93,38 @@ public class ContainerOperationClient implements ScmClient {
}
}
/**
* Creates a Container on SCM with specified replication factor.
* @param containerId - String container ID
* @param replicationFactor - replication factor
* @return Pipeline
* @throws IOException
*/
@Override
public Pipeline createContainer(String containerId,
ScmClient.ReplicationFactor replicationFactor) throws IOException {
XceiverClientSpi client = null;
try {
// allocate container on SCM.
Pipeline pipeline =
storageContainerLocationClient.allocateContainer(containerId,
replicationFactor);
// connect to pipeline leader and allocate container on leader datanode.
client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString();
ContainerProtocolCalls.createContainer(client, traceID);
LOG.info("Created container " + containerId +
" leader:" + pipeline.getLeader() +
" machines:" + pipeline.getMachines() +
" replication factor:" + replicationFactor.getValue());
return pipeline;
} finally {
if (client != null) {
xceiverClientManager.releaseClient(client);
}
}
}
/**
* Delete the container, this will release any resource it uses.
* @param pipeline - Pipeline that represents the container.

View File

@ -64,4 +64,41 @@ public interface ScmClient {
* @throws IOException
*/
long getContainerSize(Pipeline pipeline) throws IOException;
/**
* Replication factors supported by Ozone and SCM.
*/
enum ReplicationFactor{
ONE(1),
THREE(3);
private final int value;
ReplicationFactor(int value) {
this.value = value;
}
public int getValue() {
return value;
}
public static ReplicationFactor parseReplicationFactor(int i) {
switch (i) {
case 1: return ONE;
case 3: return THREE;
default:
throw new IllegalArgumentException("Only replication factor 1 or 3" +
" is supported by Ozone/SCM.");
}
}
}
/**
* Creates a Container on SCM and returns the pipeline.
* @param containerId - String container ID
* @param replicationFactor - replication factor (only 1/3 is supported)
* @return Pipeline
* @throws IOException
*/
Pipeline createContainer(String containerId,
ReplicationFactor replicationFactor) throws IOException;
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.scm.protocol;
import java.io.IOException;
import java.util.Set;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
/**
@ -49,4 +50,16 @@ public interface StorageContainerLocationProtocol {
* @throws IOException
*/
Pipeline allocateContainer(String containerName) throws IOException;
/**
* Asks SCM where a container should be allocated. SCM responds with the
* set of datanodes that should be used creating this container.
* @param containerName - Name of the container.
* @param replicationFactor - replication factor.
* @return Pipeline.
* @throws IOException
*/
Pipeline allocateContainer(String containerName,
ScmClient.ReplicationFactor replicationFactor) throws IOException;
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.protocol.LocatedContainer;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
@ -108,15 +109,31 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
*/
@Override
public Pipeline allocateContainer(String containerName) throws IOException {
return allocateContainer(containerName, ScmClient.ReplicationFactor.ONE);
}
/**
* Asks SCM where a container should be allocated. SCM responds with the set
* of datanodes that should be used creating this container. Ozone/SCM only
* supports replication factor of either 1 or 3.
*
* @param containerName - Name of the container.
* @param replicationFactor - replication factor.
* @return Pipeline.
* @throws IOException
*/
@Override
public Pipeline allocateContainer(String containerName,
ScmClient.ReplicationFactor replicationFactor) throws IOException {
Preconditions.checkNotNull(containerName, "Container Name cannot be Null");
Preconditions.checkState(!containerName.isEmpty(), "Container name cannot" +
" be empty");
ContainerRequestProto request = ContainerRequestProto.newBuilder()
.setContainerName(containerName).build();
.setContainerName(containerName).setReplicationFactor(PBHelperClient
.convertReplicationFactor(replicationFactor)).build();
final ContainerResponseProto response;
final ContainerResponseProto response;
try {
response = rpcProxy.allocateContainer(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {

View File

@ -62,6 +62,12 @@ message LocatedContainerProto {
*/
message ContainerRequestProto {
required string containerName = 1;
// Ozone only support replciation of either 1 or 3.
enum ReplicationFactor {
ONE = 1;
THREE = 3;
}
required ReplicationFactor replicationFactor = 2;
}
/**

View File

@ -180,7 +180,8 @@ public class StorageManager {
ArrayList<String> containerIds = new ArrayList<>();
while (allocatedSize < volumeSize) {
Pipeline pipeline = storageClient.createContainer(
KeyUtil.getContainerName(userName, volumeName, containerIdx));
KeyUtil.getContainerName(userName, volumeName, containerIdx),
ScmClient.ReplicationFactor.ONE);
ContainerDescriptor container =
new ContainerDescriptor(pipeline.getContainerName());
container.setPipeline(pipeline);

View File

@ -420,8 +420,8 @@ public final class OzoneClientUtils {
* that this value is greater than heartbeat interval and heartbeatProcess
* Interval.
*
* @param conf
* @return
* @param conf - Configuration.
* @return - the interval for dead node flagging.
*/
public static long getDeadNodeInterval(Configuration conf) {
long staleNodeIntervalMs = getStaleNodeInterval(conf);
@ -444,7 +444,7 @@ public final class OzoneClientUtils {
/**
* Returns the maximum number of heartbeat to process per loop of the process
* thread.
* @param conf Configration
* @param conf Configuration
* @return - int -- Number of HBs to process
*/
public static int getMaxHBToProcessPerLoop(Configuration conf) {

View File

@ -69,6 +69,10 @@ public final class OzoneConsts {
public final static String CHUNK_OVERWRITE = "OverWriteRequested";
public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB
public static final long KB = 1024L;
public static final long MB = KB * 1024L;
public static final long GB = MB * 1024L;
public static final long TB = GB * 1024L;
/**
* Supports Bucket Versioning.

View File

@ -190,10 +190,10 @@ public class ContainerData {
/**
* Set container Path.
* @param containerFilePath - File path.
* @param containerPath - File path.
*/
public void setContainerPath(String containerFilePath) {
this.containerFilePath = containerFilePath;
public void setContainerPath(String containerPath) {
this.containerFilePath = containerPath;
}
}

View File

@ -170,6 +170,8 @@ public class Dispatcher implements ContainerDispatcher {
default:
return ContainerUtils.unsupportedRequest(msg);
}
} catch (StorageContainerException e) {
return ContainerUtils.logAndReturnError(LOG, e, msg);
} catch (IOException ex) {
LOG.warn("Container operation failed. " +
"Container: {} Operation: {} trace ID: {} Error: {}",
@ -212,6 +214,8 @@ public class Dispatcher implements ContainerDispatcher {
return ContainerUtils.unsupportedRequest(msg);
}
} catch (StorageContainerException e) {
return ContainerUtils.logAndReturnError(LOG, e, msg);
} catch (IOException ex) {
LOG.warn("Container operation failed. " +
"Container: {} Operation: {} trace ID: {} Error: {}",
@ -253,6 +257,8 @@ public class Dispatcher implements ContainerDispatcher {
default:
return ContainerUtils.unsupportedRequest(msg);
}
} catch (StorageContainerException e) {
return ContainerUtils.logAndReturnError(LOG, e, msg);
} catch (IOException ex) {
LOG.warn("Container operation failed. " +
"Container: {} Operation: {} trace ID: {} Error: {}",
@ -549,6 +555,8 @@ public class Dispatcher implements ContainerDispatcher {
keyData.setChunks(chunks);
this.containerManager.getKeyManager().putKey(pipeline, keyData);
return FileUtils.getPutFileResponse(msg);
} catch (StorageContainerException e) {
return ContainerUtils.logAndReturnError(LOG, e, msg);
} catch (IOException e) {
throw new StorageContainerException("Put Small File Failed.", e,
PUT_SMALL_FILE_ERROR);
@ -595,10 +603,11 @@ public class Dispatcher implements ContainerDispatcher {
metrics.incContainerBytesStats(Type.GetSmallFile, bytes);
return FileUtils.getGetSmallFileResponse(msg, dataBuf.toByteArray(),
ChunkInfo.getFromProtoBuf(c));
} catch (StorageContainerException e) {
return ContainerUtils.logAndReturnError(LOG, e, msg);
} catch (IOException e) {
throw new StorageContainerException("Unable to decode protobuf", e,
throw new StorageContainerException("Get Small File Failed", e,
GET_SMALL_FILE_ERROR);
}
}
}

View File

@ -126,6 +126,7 @@ public class DatanodeStateMachine implements Closeable {
*/
@Override
public void close() throws IOException {
context.setState(DatanodeStates.getLastState());
executorService.shutdown();
try {
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {

View File

@ -107,10 +107,10 @@ public class EndpointStateMachine implements Closeable {
/**
* Sets the endpoint state.
*
* @param state - state.
* @param epState - end point state.
*/
public EndPointStates setState(EndPointStates state) {
this.state = state;
public EndPointStates setState(EndPointStates epState) {
this.state = epState;
return this.state;
}

View File

@ -20,7 +20,8 @@ package org.apache.hadoop.ozone.container.common.transport.server;
import java.io.IOException;
/** A server endpoint that acts as the communication layer for Ozone containers. */
/** A server endpoint that acts as the communication layer for Ozone
* containers. */
public interface XceiverServerSpi {
/** Starts the server. */
void start() throws IOException;

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.ozone.scm;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
import java.util.Map;
@ -31,22 +30,21 @@ import java.util.Map;
public interface SCMMXBean {
/**
* Get the number of data nodes that in all states,
* valid states are defined by {@link SCMNodeManager.NODESTATE}.
* Get the number of data nodes that in all states.
*
* @return A state to number of nodes that in this state mapping
*/
public Map<String, Integer> getNodeCount();
Map<String, Integer> getNodeCount();
/**
* Get the SCM RPC server port that used to listen to datanode requests.
* @return SCM datanode RPC server port
*/
public String getDatanodeRpcPort();
String getDatanodeRpcPort();
/**
* Get the SCM RPC server port that used to listen to client requests.
* @return SCM client RPC server port
*/
public String getClientRpcPort();
String getClientRpcPort();
}

View File

@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -29,6 +30,7 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.protocol.LocatedContainer;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
@ -130,7 +132,7 @@ public class StorageContainerManager
private final RPC.Server clientRpcServer;
private final InetSocketAddress clientRpcAddress;
/** SCM mxbean*/
/** SCM mxbean. */
private ObjectName scmInfoBeanName;
/**
@ -341,7 +343,24 @@ public class StorageContainerManager
*/
@Override
public Pipeline allocateContainer(String containerName) throws IOException {
return scmContainerManager.allocateContainer(containerName);
return scmContainerManager.allocateContainer(containerName,
ScmClient.ReplicationFactor.ONE);
}
/**
* Asks SCM where a container should be allocated. SCM responds with the set
* of datanodes that should be used creating this container.
*
* @param containerName - Name of the container.
* @param replicationFactor - replication factor.
* @return Pipeline.
* @throws IOException
*/
@Override
public Pipeline allocateContainer(String containerName,
ScmClient.ReplicationFactor replicationFactor) throws IOException {
return scmContainerManager.allocateContainer(containerName,
replicationFactor);
}
/**
@ -396,6 +415,7 @@ public class StorageContainerManager
LOG.info("Stopping the RPC server for DataNodes");
datanodeRpcServer.stop();
unregisterMXBean();
IOUtils.closeQuietly(scmContainerManager);
}
/**

View File

@ -22,7 +22,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.utils.LevelDBStore;
import org.slf4j.Logger;
@ -30,9 +33,10 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.iq80.leveldb.Options;
@ -50,7 +54,8 @@ public class ContainerMapping implements Mapping {
private final Lock lock;
private final Charset encoding = Charset.forName("UTF-8");
private final LevelDBStore containerStore;
private final Random rand;
private final ContainerPlacementPolicy placementPolicy;
private final long containerSize;
/**
* Constructs a mapping class that creates mapping between container names and
@ -61,10 +66,11 @@ public class ContainerMapping implements Mapping {
* @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache
* its nodes. This is passed to LevelDB and this memory is allocated in Native
* code space. CacheSize is specified in MB.
* @throws IOException
*/
@SuppressWarnings("unchecked")
public ContainerMapping(Configuration conf, NodeManager nodeManager,
int cacheSizeMB) throws IOException {
public ContainerMapping(final Configuration conf,
final NodeManager nodeManager, final int cacheSizeMB) throws IOException {
this.nodeManager = nodeManager;
this.cacheSize = cacheSizeMB;
@ -76,7 +82,7 @@ public class ContainerMapping implements Mapping {
new IllegalArgumentException("SCM metadata directory is not valid.");
}
Options options = new Options();
options.cacheSize(this.cacheSize * (1024L * 1024L));
options.cacheSize(this.cacheSize * OzoneConsts.MB);
options.createIfMissing();
// Write the container name to pipeline mapping.
@ -84,30 +90,65 @@ public class ContainerMapping implements Mapping {
containerStore = new LevelDBStore(containerDBPath, options);
this.lock = new ReentrantLock();
rand = new Random();
this.containerSize = OzoneConsts.GB * conf.getInt(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
this.placementPolicy = createContainerPlacementPolicy(nodeManager, conf);
}
/**
* Create pluggable container placement policy implementation instance.
*
* @param nodeManager - SCM node manager.
* @param conf - configuration.
* @return SCM container placement policy implementation instance.
*/
private static ContainerPlacementPolicy createContainerPlacementPolicy(
final NodeManager nodeManager, final Configuration conf) {
Class<? extends ContainerPlacementPolicy> implClass =
(Class<? extends ContainerPlacementPolicy>) conf.getClass(
ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
SCMContainerPlacementRandom.class);
try {
Constructor<? extends ContainerPlacementPolicy> ctor =
implClass.getDeclaredConstructor(NodeManager.class,
Configuration.class);
return ctor.newInstance(nodeManager, conf);
} catch (RuntimeException e) {
throw e;
} catch (InvocationTargetException e) {
throw new RuntimeException(implClass.getName()
+ " could not be constructed.", e.getCause());
} catch (Exception e) {
}
return null;
}
/**
* // TODO : Fix the code to handle multiple nodes.
* Translates a list of nodes, ordered such that the first is the leader, into
* a corresponding {@link Pipeline} object.
*
* @param node datanode on which we will allocate the contianer.
* @param nodes - list of datanodes on which we will allocate the container.
* The first of the list will be the leader node.
* @param containerName container name
* @return pipeline corresponding to nodes
*/
private static Pipeline newPipelineFromNodes(DatanodeID node, String
containerName) {
Preconditions.checkNotNull(node);
String leaderId = node.getDatanodeUuid();
private static Pipeline newPipelineFromNodes(final List<DatanodeID> nodes,
final String containerName) {
Preconditions.checkNotNull(nodes);
Preconditions.checkArgument(nodes.size() > 0);
String leaderId = nodes.get(0).getDatanodeUuid();
Pipeline pipeline = new Pipeline(leaderId);
pipeline.addMember(node);
for (DatanodeID node : nodes) {
pipeline.addMember(node);
}
pipeline.setContainerName(containerName);
return pipeline;
}
/**
* Returns the Pipeline from the container name.
*
@ -115,7 +156,7 @@ public class ContainerMapping implements Mapping {
* @return - Pipeline that makes up this container.
*/
@Override
public Pipeline getContainer(String containerName) throws IOException {
public Pipeline getContainer(final String containerName) throws IOException {
Pipeline pipeline = null;
lock.lock();
try {
@ -141,7 +182,22 @@ public class ContainerMapping implements Mapping {
* @throws IOException
*/
@Override
public Pipeline allocateContainer(String containerName) throws IOException {
public Pipeline allocateContainer(final String containerName)
throws IOException {
return allocateContainer(containerName, ScmClient.ReplicationFactor.ONE);
}
/**
* Allocates a new container.
*
* @param containerName - Name of the container.
* @param replicationFactor - replication factor of the container.
* @return - Pipeline that makes up this container.
* @throws IOException
*/
@Override
public Pipeline allocateContainer(final String containerName,
final ScmClient.ReplicationFactor replicationFactor) throws IOException {
Preconditions.checkNotNull(containerName);
Preconditions.checkState(!containerName.isEmpty());
Pipeline pipeline = null;
@ -157,9 +213,11 @@ public class ContainerMapping implements Mapping {
throw new IOException("Specified container already exists. key : " +
containerName);
}
DatanodeID id = getDatanodeID();
if (id != null) {
pipeline = newPipelineFromNodes(id, containerName);
List<DatanodeID> datanodes = placementPolicy.chooseDatanodes(
replicationFactor.getValue(), containerSize);
// TODO: handle under replicated container
if (datanodes != null && datanodes.size() > 0) {
pipeline = newPipelineFromNodes(datanodes, containerName);
containerStore.put(containerName.getBytes(encoding),
pipeline.getProtobufMessage().toByteArray());
}
@ -169,24 +227,6 @@ public class ContainerMapping implements Mapping {
return pipeline;
}
/**
* Returns a random Datanode ID from the list of healthy nodes.
*
* @return Datanode ID
* @throws IOException
*/
private DatanodeID getDatanodeID() throws IOException {
List<DatanodeID> healthyNodes =
nodeManager.getNodes(NodeManager.NODESTATE.HEALTHY);
if (healthyNodes.size() == 0) {
throw new IOException("No healthy node found to allocate container.");
}
int index = rand.nextInt() % healthyNodes.size();
return healthyNodes.get(Math.abs(index));
}
/**
* Closes this stream and releases any system resources associated with it. If
* the stream is already closed then invoking this method has no effect.

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.container;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import java.io.IOException;
import java.util.List;
/**
* A ContainerPlacementPolicy support choosing datanodes to build replication
* pipeline with specified constraints.
*/
public interface ContainerPlacementPolicy {
/**
* Given the replication factor and size required, return set of datanodes
* that satisfy the nodes and size requirement.
* @param nodesRequired - number of datanodes required.
* @param sizeRequired - size required for the container or block.
* @return list of datanodes chosen.
* @throws IOException
*/
List<DatanodeID> chooseDatanodes(int nodesRequired, long sizeRequired)
throws IOException;
}

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.scm.container;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.Closeable;
@ -44,4 +45,15 @@ public interface Mapping extends Closeable {
* @throws IOException
*/
Pipeline allocateContainer(String containerName) throws IOException;
/**
* Allocates a new container for a given keyName and replication factor.
*
* @param containerName - Name.
* @param replicationFactor - replication factor of the container.
* @return - Pipeline that makes up this container.
* @throws IOException
*/
Pipeline allocateContainer(String containerName,
ScmClient.ReplicationFactor replicationFactor) throws IOException;
}

View File

@ -0,0 +1,207 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.container;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.node.SCMNodeStat;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import static java.lang.Math.abs;
/**
* Container placement policy that randomly choose datanodes with remaining
* space satisfy the size constraints.
*/
public final class SCMContainerPlacementCapacity
implements ContainerPlacementPolicy {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMContainerPlacementCapacity.class);
private static int maxRetry = 100;
private final NodeManager nodeManager;
private final Random rand;
private final Configuration conf;
public SCMContainerPlacementCapacity(final NodeManager nodeManager,
final Configuration conf) {
this.nodeManager = nodeManager;
this.rand = new Random();
this.conf = conf;
}
@Override
public List<DatanodeID> chooseDatanodes(final int nodesRequired,
final long sizeRequired) throws IOException {
List<DatanodeID> healthyNodes =
nodeManager.getNodes(NodeManager.NODESTATE.HEALTHY);
if (healthyNodes.size() == 0) {
throw new IOException("No healthy node found to allocate container.");
}
if (healthyNodes.size() < nodesRequired) {
throw new IOException("Not enough nodes to allocate container with " +
nodesRequired + " datanodes required.");
}
if (healthyNodes.size() == nodesRequired) {
return healthyNodes;
}
// TODO: add allocation time as metrics
long beginTime = Time.monotonicNow();
Set<DatanodeID> results = new HashSet<>();
for (int i = 0; i < nodesRequired; i++) {
DatanodeID candidate = chooseNode(results, healthyNodes, sizeRequired);
if (candidate != null) {
results.add(candidate);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding datanode {}. Results.size = {} nodesRequired = {}",
candidate, results.size(), nodesRequired);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Can't find datanode. Results.size = {} nodesRequired = {}",
results.size(), nodesRequired);
}
break;
}
}
if (LOG.isTraceEnabled()) {
long endTime = Time.monotonicNow();
LOG.trace("SCMContainerPlacementCapacity takes {} ms to choose nodes.",
endTime - beginTime);
}
// TODO: handle under replicated case.
// For now, throw exception only when we can't find any datanode.
if (results.size() == 0) {
throw new IOException("No healthy node found " +
"with enough remaining capacity to allocate container.");
}
if (results.size() != nodesRequired) {
if (LOG.isDebugEnabled()) {
LOG.debug("SCMContainerPlacementCapacity cannot find enough healthy" +
" datanodes with remaining capacity > {} ." +
"(nodesRequired = {}, nodesFound = {})", sizeRequired,
nodesRequired, results.size());
}
}
return results.stream().collect(Collectors.toList());
}
/**
* Choose one random node from 2-Random nodes that satisfy the size required.
* @param results - set of current chosen datanodes.
* @param healthyNodes - all healthy datanodes.
* @param sizeRequired - size required for container.
* @return one with larger remaining capacity from two randomly chosen
* datanodes that satisfy sizeRequirement but are not in current
* result set.
*/
private DatanodeID chooseNode(final Set results,
final List<DatanodeID> healthyNodes, final long sizeRequired) {
NodeAndStat firstNode = chooseOneNode(results, healthyNodes,
sizeRequired);
if (firstNode == null) {
return null;
}
NodeAndStat secondNode = chooseOneNode(results, healthyNodes,
sizeRequired);
if (secondNode == null) {
return firstNode.getDatanodeID();
}
// Pick one with larger remaining space.
return firstNode.getDatanodeStat().getRemaining() >
secondNode.getDatanodeStat().getRemaining() ?
firstNode.getDatanodeID() : secondNode.getDatanodeID();
}
/**
* Choose one random node from healthy nodes that satisfies the size
* requirement and has not been chosen in the existing results.
* Retry up to maxRetry(100) times.
* @param results - set of current chosen datanodes.
* @param healthyNodes - all healthy datanodes.
* @param sizeRequired - size required for container.
* @return one with larger remaining capacity from two randomly chosen
* datanodes that satisfy sizeRequirement but are not in current
* result set.
*/
private NodeAndStat chooseOneNode(final Set<DatanodeID> results,
final List<DatanodeID> healthyNodes, final long sizeRequired) {
NodeAndStat selectedNode = null;
int retry = 0;
while (selectedNode == null && retry < maxRetry) {
int candidateIdx = abs(rand.nextInt() % healthyNodes.size());
DatanodeID candidate = healthyNodes.get(candidateIdx);
if (!results.contains(candidate)) {
SCMNodeStat stat = nodeManager.getNodeStat(candidate);
if (stat != null && stat.getRemaining() > sizeRequired) {
selectedNode = new NodeAndStat(candidate, stat);
break;
}
}
retry++;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Find {} after {} retries!", (selectedNode != null) ?
selectedNode.getDatanodeID() : "no datanode", retry);
}
return selectedNode;
}
/**
* Helper class wraps DatanodeID and SCMNodeStat.
*/
static class NodeAndStat {
private final DatanodeID datanodeID;
private final SCMNodeStat stat;
NodeAndStat(final DatanodeID id, final SCMNodeStat stat) {
this.datanodeID = id;
this.stat = stat;
}
public DatanodeID getDatanodeID() {
return datanodeID;
}
public SCMNodeStat getDatanodeStat() {
return stat;
}
}
}

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.container;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import static java.lang.Math.abs;
/**
* Container placement policy that randomly chooses healthy datanodes.
*/
public final class SCMContainerPlacementRandom
implements ContainerPlacementPolicy {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMContainerPlacementRandom.class);
private static int maxRetry = 100;
private final NodeManager nodeManager;
private final Random rand;
private final Configuration conf;
public SCMContainerPlacementRandom(final NodeManager nodeManager,
final Configuration conf) {
this.nodeManager = nodeManager;
this.rand = new Random();
this.conf = conf;
}
@Override
public List<DatanodeID> chooseDatanodes(final int nodesRequired,
final long sizeRequired) throws IOException {
List<DatanodeID> healthyNodes =
nodeManager.getNodes(NodeManager.NODESTATE.HEALTHY);
if (healthyNodes.size() == 0) {
throw new IOException("No healthy node found to allocate container.");
}
if (healthyNodes.size() < nodesRequired) {
throw new IOException("Not enough nodes to allocate container with "
+ nodesRequired + " datanodes required.");
}
if (healthyNodes.size() == nodesRequired) {
return healthyNodes;
}
// TODO: add allocation time as metrics
long beginTime = Time.monotonicNow();
Set<DatanodeID> results = new HashSet<>();
for (int i = 0; i < nodesRequired; i++) {
DatanodeID candidate = chooseNode(results, healthyNodes);
if (candidate != null) {
results.add(candidate);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding datanode {}. Results.size = {} nodesRequired = {}",
candidate, results.size(), nodesRequired);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Can't find datanode. Results.size = {} nodesRequired = {}",
results.size(), nodesRequired);
}
break;
}
}
if (LOG.isTraceEnabled()) {
long endTime = Time.monotonicNow();
LOG.trace("SCMContainerPlacementRandom takes {} ms to choose nodes.",
endTime - beginTime);
}
if (results.size() != nodesRequired) {
if (LOG.isDebugEnabled()) {
LOG.debug("SCMContainerPlacementRandom cannot find enough healthy" +
" datanodes. (nodesRequired = {}, nodesFound = {})",
nodesRequired, results.size());
}
}
return results.stream().collect(Collectors.toList());
}
/**
* Choose one random node from 2-Random nodes. Retry up to 100 times until
* find one that has not been chosen in the exising results.
* @param results - set of current chosen datanodes.
* @param healthyNodes - all healthy datanodes.
* @return one randomly chosen datanode that from two randomly chosen datanode
* that are not in current result set.
*/
private DatanodeID chooseNode(final Set<DatanodeID> results,
final List<DatanodeID> healthyNodes) {
DatanodeID selectedNode = null;
int retry = 0;
while (selectedNode == null && retry < maxRetry) {
DatanodeID firstNode = healthyNodes.get(
abs(rand.nextInt() % healthyNodes.size()));
DatanodeID secondNode = healthyNodes.get(
abs(rand.nextInt() % healthyNodes.size()));
// Randomly pick one from two candidates.
selectedNode = rand.nextBoolean() ? firstNode : secondNode;
if (results.contains(selectedNode)) {
selectedNode = null;
} else {
break;
}
retry++;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Find {} after {} retries!", (selectedNode != null) ?
selectedNode : "no datanode", retry);
}
return selectedNode;
}
}

View File

@ -17,12 +17,14 @@
*/
package org.apache.hadoop.ozone.scm.node;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
/**
* A node manager supports a simple interface for managing a datanode.
@ -115,9 +117,22 @@ public interface NodeManager extends StorageContainerNodeProtocol,
SCMNodeStat getStats();
/**
* Return a list of node stats.
* @return a list of individual node stats (live/stale but not dead).
* Return a map of node stats.
* @return a map of individual node stats (live/stale but not dead).
*/
List<SCMNodeStat> getNodeStats();
Map<String, SCMNodeStat> getNodeStats();
/**
* Return the node stat of the specified datanode.
* @param datanodeID - datanode ID.
* @return node stat if it is live/stale, null if it is dead or does't exist.
*/
SCMNodeStat getNodeStat(DatanodeID datanodeID);
/**
* Wait for the heartbeat is processed by NodeManager.
* @return true if heartbeat has been processed.
*/
@VisibleForTesting
boolean waitForHeartbeatProcessed();
}

View File

@ -33,20 +33,20 @@ public interface NodeManagerMXBean {
*
* @return int
*/
public int getMinimumChillModeNodes();
int getMinimumChillModeNodes();
/**
* Reports if we have exited out of chill mode by discovering enough nodes.
*
* @return True if we are out of Node layer chill mode, false otherwise.
*/
public boolean isOutOfNodeChillMode();
boolean isOutOfNodeChillMode();
/**
* Returns a chill mode status string.
* @return String
*/
public String getChillModeStatus();
String getChillModeStatus();
/**
@ -54,13 +54,12 @@ public interface NodeManagerMXBean {
* @return true if forceEnterChillMode has been called,
* false if forceExitChillMode or status is not set. eg. clearChillModeFlag.
*/
public boolean isInManualChillMode();
boolean isInManualChillMode();
/**
* Get the number of data nodes that in all states,
* valid states are defined by {@link SCMNodeManager.NODESTATE}.
* Get the number of data nodes that in all states.
*
* @return A state to number of nodes that in this state mapping
*/
public Map<String, Integer> getNodeCount();
Map<String, Integer> getNodeCount();
}

View File

@ -377,7 +377,8 @@ public class SCMNodeManager
* @return true if the HB check is done.
*/
@VisibleForTesting
public boolean waitForHeartbeatThead() {
@Override
public boolean waitForHeartbeatProcessed() {
return lastHBcheckFinished != 0;
}
@ -611,8 +612,8 @@ public class SCMNodeManager
*/
@Override
public void close() throws IOException {
executorService.shutdown();
unregisterMXBean();
executorService.shutdown();
try {
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
executorService.shutdownNow();
@ -739,13 +740,22 @@ public class SCMNodeManager
}
/**
* Return a list of node stats.
* @return a list of individual node stats (live/stale but not dead).
* Return a map of node stats.
* @return a map of individual node stats (live/stale but not dead).
*/
@Override
public List<SCMNodeStat> getNodeStats(){
return nodeStats.entrySet().stream().map(
entry -> nodeStats.get(entry.getKey())).collect(Collectors.toList());
public Map<String, SCMNodeStat> getNodeStats() {
return Collections.unmodifiableMap(nodeStats);
}
/**
* Return the node stat of the specified datanode.
* @param datanodeID - datanode ID.
* @return node stat if it is live/stale, null if it is dead or does't exist.
*/
@Override
public SCMNodeStat getNodeStat(DatanodeID datanodeID) {
return nodeStats.get(datanodeID.getDatanodeUuid());
}
@Override

View File

@ -196,7 +196,7 @@ public final class OzoneMetadataManager {
metadataDB.get(args.getVolumeName().getBytes(encoding));
if (volumeName != null) {
LOG.debug("Volume already exists.");
LOG.debug("Volume {} already exists.", volumeName);
throw ErrorTable.newError(ErrorTable.VOLUME_ALREADY_EXISTS, args);
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.cblock.util;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@ -73,6 +74,14 @@ public class MockStorageClient implements ScmClient {
@Override
public long getContainerSize(Pipeline pipeline) throws IOException {
// just return a constant value for now
return 5L*1024*1024*1024; // 5GB
return 5L * OzoneConsts.GB; // 5GB
}
@Override
public Pipeline createContainer(String containerId,
ScmClient.ReplicationFactor replicationFactor) throws IOException {
currentContainerId += 1;
ContainerLookUpService.addContainer(Long.toString(currentContainerId));
return ContainerLookUpService.lookUp(Long.toString(currentContainerId))
.getPipeline(); }
}

View File

@ -60,7 +60,8 @@ import static org.junit.Assert.assertFalse;
* convenient reuse of logic for starting DataNodes.
*/
@InterfaceAudience.Private
public class MiniOzoneCluster extends MiniDFSCluster implements Closeable {
public final class MiniOzoneCluster extends MiniDFSCluster
implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(MiniOzoneCluster.class);
private static final String USER_AUTH = "hdfs";
@ -198,6 +199,16 @@ public class MiniOzoneCluster extends MiniDFSCluster implements Closeable {
}, 100, 45000);
}
public void waitForHeartbeatProcessed() throws TimeoutException,
InterruptedException {
GenericTestUtils.waitFor(() ->
scm.getScmNodeManager().waitForHeartbeatProcessed(), 100,
4 * 1000);
GenericTestUtils.waitFor(() ->
scm.getScmNodeManager().getStats().getCapacity() > 0, 100,
4 * 1000);
}
/**
* Builder for configuring the MiniOzoneCluster to run.
*/
@ -242,6 +253,12 @@ public class MiniOzoneCluster extends MiniDFSCluster implements Closeable {
return this;
}
@Override
public Builder storageCapacities(long[] capacities) {
super.storageCapacities(capacities);
return this;
}
public Builder setHandlerType(String handler) {
ozoneHandlerType = Optional.of(handler);
return this;
@ -347,7 +364,6 @@ public class MiniOzoneCluster extends MiniDFSCluster implements Closeable {
// datanodes in the cluster.
conf.setStrings(ScmConfigKeys.OZONE_SCM_DATANODE_ID,
scmPath.toString() + "/datanode.id");
}
private void configureHandler() {

View File

@ -19,6 +19,9 @@ package org.apache.hadoop.ozone;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.scm.container.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.container.SCMContainerPlacementCapacity;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.client.ContainerOperationClient;
import org.apache.hadoop.scm.client.ScmClient;
@ -44,9 +47,14 @@ public class TestContainerOperations {
@BeforeClass
public static void setup() throws Exception {
int containerSizeGB = 5;
ContainerOperationClient.setContainerSizeB(containerSizeGB*1024*1024*1024L);
long datanodeCapacities = 3 * OzoneConsts.TB;
ContainerOperationClient.setContainerSizeB(
containerSizeGB * OzoneConsts.GB);
ozoneConf = new OzoneConfiguration();
ozoneConf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
cluster = new MiniOzoneCluster.Builder(ozoneConf).numDataNodes(1)
.storageCapacities(new long[] {datanodeCapacities, datanodeCapacities})
.setHandlerType("distributed").build();
StorageContainerLocationProtocolClientSideTranslatorPB client =
cluster.createStorageContainerLocationClient();
@ -54,6 +62,7 @@ public class TestContainerOperations {
ProtobufRpcEngine.class);
storageClient = new ContainerOperationClient(
client, new XceiverClientManager(ozoneConf));
cluster.waitForHeartbeatProcessed();
}
@AfterClass

View File

@ -53,6 +53,7 @@ public final class ContainerTestHelper {
private ContainerTestHelper() {
}
// TODO: mock multi-node pipeline
/**
* Create a pipeline with single node replica.
*

View File

@ -105,7 +105,21 @@ public class TestDatanodeStateMachine {
@After
public void tearDown() throws Exception {
try {
executorService.shutdownNow();
if (executorService != null) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
LOG.error("Unable to shutdown properly.");
}
} catch (InterruptedException e) {
LOG.error("Error attempting to shutdown.", e);
executorService.shutdownNow();
}
}
for (RPC.Server s : scmServers) {
s.stop();
}
@ -122,13 +136,13 @@ public class TestDatanodeStateMachine {
@Test
public void testDatanodeStateMachineStartThread() throws IOException,
InterruptedException, TimeoutException {
DatanodeStateMachine stateMachine =
DatanodeStateMachine.initStateMachine(conf);
SCMConnectionManager connectionManager =
stateMachine.getConnectionManager();
GenericTestUtils.waitFor(() -> connectionManager.getValues().size() == 3,
1000, 30000);
stateMachine.close();
try (DatanodeStateMachine stateMachine =
DatanodeStateMachine.initStateMachine(conf)) {
SCMConnectionManager connectionManager =
stateMachine.getConnectionManager();
GenericTestUtils.waitFor(() -> connectionManager.getValues().size() == 3,
1000, 30000);
}
}
/**
@ -164,100 +178,101 @@ public class TestDatanodeStateMachine {
@Test
public void testDatanodeStateContext() throws IOException,
InterruptedException, ExecutionException, TimeoutException {
final DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf);
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
currentState);
try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf)) {
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
currentState);
DatanodeState<DatanodeStateMachine.DatanodeStates> task =
stateMachine.getContext().getTask();
Assert.assertEquals(InitDatanodeState.class, task.getClass());
DatanodeState<DatanodeStateMachine.DatanodeStates> task =
stateMachine.getContext().getTask();
Assert.assertEquals(InitDatanodeState.class, task.getClass());
task.execute(executorService);
DatanodeStateMachine.DatanodeStates newState =
task.await(2, TimeUnit.SECONDS);
task.execute(executorService);
DatanodeStateMachine.DatanodeStates newState =
task.await(2, TimeUnit.SECONDS);
for (EndpointStateMachine endpoint :
stateMachine.getConnectionManager().getValues()) {
// We assert that each of the is in State GETVERSION.
Assert.assertEquals(EndpointStateMachine.EndPointStates.GETVERSION,
endpoint.getState());
}
for (EndpointStateMachine endpoint :
stateMachine.getConnectionManager().getValues()) {
// We assert that each of the is in State GETVERSION.
Assert.assertEquals(EndpointStateMachine.EndPointStates.GETVERSION,
endpoint.getState());
}
// The Datanode has moved into Running State, since endpoints are created.
// We move to running state when we are ready to issue RPC calls to SCMs.
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
newState);
// The Datanode has moved into Running State, since endpoints are created.
// We move to running state when we are ready to issue RPC calls to SCMs.
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
newState);
// If we had called context.execute instead of calling into each state
// this would have happened automatically.
stateMachine.getContext().setState(newState);
task = stateMachine.getContext().getTask();
Assert.assertEquals(RunningDatanodeState.class, task.getClass());
// If we had called context.execute instead of calling into each state
// this would have happened automatically.
stateMachine.getContext().setState(newState);
task = stateMachine.getContext().getTask();
Assert.assertEquals(RunningDatanodeState.class, task.getClass());
// This execute will invoke getVersion calls against all SCM endpoints
// that we know of.
// This execute will invoke getVersion calls against all SCM endpoints
// that we know of.
task.execute(executorService);
newState = task.await(10, TimeUnit.SECONDS);
// If we are in running state, we should be in running.
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
newState);
task.execute(executorService);
newState = task.await(10, TimeUnit.SECONDS);
// If we are in running state, we should be in running.
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
newState);
for (EndpointStateMachine endpoint :
stateMachine.getConnectionManager().getValues()) {
for (EndpointStateMachine endpoint :
stateMachine.getConnectionManager().getValues()) {
// Since the earlier task.execute called into GetVersion, the
// endPointState Machine should move to REGISTER state.
Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER,
endpoint.getState());
// Since the earlier task.execute called into GetVersion, the
// endPointState Machine should move to REGISTER state.
Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER,
endpoint.getState());
// We assert that each of the end points have gotten a version from the
// SCM Server.
Assert.assertNotNull(endpoint.getVersion());
}
// We assert that each of the end points have gotten a version from the
// SCM Server.
Assert.assertNotNull(endpoint.getVersion());
}
// We can also assert that all mock servers have received only one RPC
// call at this point of time.
for (ScmTestMock mock : mockServers) {
Assert.assertEquals(1, mock.getRpcCount());
}
// We can also assert that all mock servers have received only one RPC
// call at this point of time.
for (ScmTestMock mock : mockServers) {
Assert.assertEquals(1, mock.getRpcCount());
}
// This task is the Running task, but running task executes tasks based
// on the state of Endpoints, hence this next call will be a Register at
// the endpoint RPC level.
task = stateMachine.getContext().getTask();
task.execute(executorService);
newState = task.await(2, TimeUnit.SECONDS);
// This task is the Running task, but running task executes tasks based
// on the state of Endpoints, hence this next call will be a Register at
// the endpoint RPC level.
task = stateMachine.getContext().getTask();
task.execute(executorService);
newState = task.await(2, TimeUnit.SECONDS);
// If we are in running state, we should be in running.
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
newState);
// If we are in running state, we should be in running.
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
newState);
for (ScmTestMock mock : mockServers) {
Assert.assertEquals(2, mock.getRpcCount());
}
for (ScmTestMock mock : mockServers) {
Assert.assertEquals(2, mock.getRpcCount());
}
// This task is the Running task, but running task executes tasks based
// on the state of Endpoints, hence this next call will be a
// HeartbeatTask at the endpoint RPC level.
task = stateMachine.getContext().getTask();
task.execute(executorService);
newState = task.await(2, TimeUnit.SECONDS);
// This task is the Running task, but running task executes tasks based
// on the state of Endpoints, hence this next call will be a
// HeartbeatTask at the endpoint RPC level.
task = stateMachine.getContext().getTask();
task.execute(executorService);
newState = task.await(2, TimeUnit.SECONDS);
// If we are in running state, we should be in running.
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
newState);
// If we are in running state, we should be in running.
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING,
newState);
for (ScmTestMock mock : mockServers) {
Assert.assertEquals(1, mock.getHeartbeatCount());
// Assert that heartbeat did indeed carry that State that we said
// have in the datanode.
Assert.assertEquals(mock.getReportState().getState().getNumber(),
StorageContainerDatanodeProtocolProtos.ReportState.states
.noContainerReports.getNumber());
for (ScmTestMock mock : mockServers) {
Assert.assertEquals(1, mock.getHeartbeatCount());
// Assert that heartbeat did indeed carry that State that we said
// have in the datanode.
Assert.assertEquals(mock.getReportState().getState().getNumber(),
StorageContainerDatanodeProtocolProtos.ReportState.states
.noContainerReports.getNumber());
}
}
}
@ -276,20 +291,20 @@ public class TestDatanodeStateMachine {
"scm:123456" // Port out of range
}) {
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, name);
final DatanodeStateMachine stateMachine =
new DatanodeStateMachine(conf);
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
currentState);
DatanodeState<DatanodeStateMachine.DatanodeStates> task =
stateMachine.getContext().getTask();
task.execute(executorService);
DatanodeStateMachine.DatanodeStates newState =
task.await(2, TimeUnit.SECONDS);
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.SHUTDOWN,
newState);
try (DatanodeStateMachine stateMachine =
new DatanodeStateMachine(conf)) {
DatanodeStateMachine.DatanodeStates currentState =
stateMachine.getContext().getState();
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
currentState);
DatanodeState<DatanodeStateMachine.DatanodeStates> task =
stateMachine.getContext().getTask();
task.execute(executorService);
DatanodeStateMachine.DatanodeStates newState =
task.await(2, TimeUnit.SECONDS);
Assert.assertEquals(DatanodeStateMachine.DatanodeStates.SHUTDOWN,
newState);
}
}
}
}

View File

@ -291,20 +291,21 @@ public class TestEndPoint {
}
}
private EndpointStateMachine heartbeatTaskHelper(InetSocketAddress scmAddress,
private void heartbeatTaskHelper(InetSocketAddress scmAddress,
int rpcTimeout) throws Exception {
Configuration conf = SCMTestUtils.getConf();
EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(
conf, scmAddress, rpcTimeout);
conf.set(DFS_DATANODE_DATA_DIR_KEY, testDir.getAbsolutePath());
// Create a datanode state machine for stateConext used by endpoint task
try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf);
EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(conf,
scmAddress, rpcTimeout)) {
ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder()
.setClusterID(UUID.randomUUID().toString())
.setDatanodeID(SCMTestUtils.getDatanodeID().getProtoBufMessage())
.build();
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.HEARTBEAT);
// Create a datanode state machine for stateConext used by endpoint task
conf.set(DFS_DATANODE_DATA_DIR_KEY, testDir.getAbsolutePath());
final DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf);
final StateContext stateContext = new StateContext(conf,
DatanodeStateMachine.DatanodeStates.RUNNING,
stateMachine);
@ -314,27 +315,21 @@ public class TestEndPoint {
endpointTask.setContainerNodeIDProto(containerNodeID);
endpointTask.call();
Assert.assertNotNull(endpointTask.getContainerNodeIDProto());
return rpcEndPoint;
}
private void heartbeatTaskHelper(InetSocketAddress address)
throws Exception {
try (EndpointStateMachine rpcEndpoint =
heartbeatTaskHelper(address, 1000)) {
Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT,
rpcEndpoint.getState());
Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT,
rpcEndPoint.getState());
}
}
@Test
public void testHeartbeatTask() throws Exception {
heartbeatTaskHelper(serverAddress);
heartbeatTaskHelper(serverAddress, 1000);
}
@Test
public void testHeartbeatTaskToInvalidNode() throws Exception {
InetSocketAddress invalidAddress = SCMTestUtils.getReuseableAddress();
heartbeatTaskHelper(invalidAddress);
heartbeatTaskHelper(invalidAddress, 1000);
}
@Test
@ -344,7 +339,7 @@ public class TestEndPoint {
scmServerImpl.setRpcResponseDelay(1500);
long start = Time.monotonicNow();
InetSocketAddress invalidAddress = SCMTestUtils.getReuseableAddress();
heartbeatTaskHelper(invalidAddress);
heartbeatTaskHelper(invalidAddress, 1000);
long end = Time.monotonicNow();
scmServerImpl.setRpcResponseDelay(0);
Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance));

View File

@ -53,178 +53,201 @@ public class TestOzoneContainer {
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
OzoneContainer container = null;
MiniOzoneCluster cluster = null;
try {
cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType("distributed").build();
// We don't start Ozone Container via data node, we will do it
// independently in our test path.
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline(
containerName);
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort());
container = new OzoneContainer(conf);
container.start();
MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType("distributed").build();
// We don't start Ozone Container via data node, we will do it
// independently in our test path.
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline(
containerName);
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort());
OzoneContainer container = new OzoneContainer(conf);
container.start();
XceiverClient client = new XceiverClient(pipeline, conf);
client.connect();
ContainerProtos.ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest(containerName);
ContainerProtos.ContainerCommandResponseProto response =
client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
container.stop();
cluster.shutdown();
XceiverClient client = new XceiverClient(pipeline, conf);
client.connect();
ContainerProtos.ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest(containerName);
ContainerProtos.ContainerCommandResponseProto response =
client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
} finally {
if (container != null) {
container.stop();
}
if(cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testOzoneContainerViaDataNode() throws Exception {
String keyName = OzoneUtils.getRequestID();
String containerName = OzoneUtils.getRequestID();
OzoneConfiguration conf = new OzoneConfiguration();
URL p = conf.getClass().getResource("");
String path = p.getPath().concat(
TestOzoneContainer.class.getSimpleName());
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
MiniOzoneCluster cluster = null;
XceiverClient client = null;
try {
String keyName = OzoneUtils.getRequestID();
String containerName = OzoneUtils.getRequestID();
OzoneConfiguration conf = new OzoneConfiguration();
URL p = conf.getClass().getResource("");
String path = p.getPath().concat(
TestOzoneContainer.class.getSimpleName());
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
// Start ozone container Via Datanode create.
// Start ozone container Via Datanode create.
Pipeline pipeline =
ContainerTestHelper.createSingleNodePipeline(containerName);
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort());
Pipeline pipeline =
ContainerTestHelper.createSingleNodePipeline(containerName);
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort());
MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType("distributed").build();
cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType("distributed").build();
// This client talks to ozone container via datanode.
XceiverClient client = new XceiverClient(pipeline, conf);
client.connect();
// This client talks to ozone container via datanode.
client = new XceiverClient(pipeline, conf);
client.connect();
// Create container
ContainerProtos.ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest(containerName);
ContainerProtos.ContainerCommandResponseProto response =
client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
// Create container
ContainerProtos.ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest(containerName);
pipeline.setContainerName(containerName);
ContainerProtos.ContainerCommandResponseProto response =
client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
// Write Chunk
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
ContainerTestHelper.getWriteChunkRequest(pipeline, containerName,
keyName, 1024);
// Write Chunk
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
ContainerTestHelper.getWriteChunkRequest(pipeline, containerName,
keyName, 1024);
response = client.sendCommand(writeChunkRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
response = client.sendCommand(writeChunkRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
// Read Chunk
request = ContainerTestHelper.getReadChunkRequest(writeChunkRequest
.getWriteChunk());
// Read Chunk
request = ContainerTestHelper.getReadChunkRequest(writeChunkRequest
.getWriteChunk());
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
// Put Key
ContainerProtos.ContainerCommandRequestProto putKeyRequest =
ContainerTestHelper.getPutKeyRequest(writeChunkRequest.getWriteChunk());
response = client.sendCommand(putKeyRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
// Get Key
request = ContainerTestHelper.getKeyRequest(putKeyRequest.getPutKey());
response = client.sendCommand(request);
ContainerTestHelper.verifyGetKey(request, response);
// Put Key
ContainerProtos.ContainerCommandRequestProto putKeyRequest =
ContainerTestHelper.getPutKeyRequest(writeChunkRequest
.getWriteChunk());
// Delete Key
request =
ContainerTestHelper.getDeleteKeyRequest(putKeyRequest.getPutKey());
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
response = client.sendCommand(putKeyRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
//Delete Chunk
request = ContainerTestHelper.getDeleteChunkRequest(writeChunkRequest
.getWriteChunk());
// Get Key
request = ContainerTestHelper.getKeyRequest(putKeyRequest.getPutKey());
response = client.sendCommand(request);
ContainerTestHelper.verifyGetKey(request, response);
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
client.close();
cluster.shutdown();
// Delete Key
request =
ContainerTestHelper.getDeleteKeyRequest(putKeyRequest.getPutKey());
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
//Delete Chunk
request = ContainerTestHelper.getDeleteChunkRequest(writeChunkRequest
.getWriteChunk());
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
} finally {
if (client != null) {
client.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testBothGetandPutSmallFile() throws Exception {
String keyName = OzoneUtils.getRequestID();
String containerName = OzoneUtils.getRequestID();
OzoneConfiguration conf = new OzoneConfiguration();
URL p = conf.getClass().getResource("");
String path = p.getPath().concat(
TestOzoneContainer.class.getSimpleName());
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
MiniOzoneCluster cluster = null;
XceiverClient client = null;
try {
String keyName = OzoneUtils.getRequestID();
String containerName = OzoneUtils.getRequestID();
OzoneConfiguration conf = new OzoneConfiguration();
URL p = conf.getClass().getResource("");
String path = p.getPath().concat(
TestOzoneContainer.class.getSimpleName());
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
// Start ozone container Via Datanode create.
// Start ozone container Via Datanode create.
Pipeline pipeline =
ContainerTestHelper.createSingleNodePipeline(containerName);
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort());
Pipeline pipeline =
ContainerTestHelper.createSingleNodePipeline(containerName);
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort());
MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType("distributed").build();
cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType("distributed").build();
// This client talks to ozone container via datanode.
XceiverClient client = new XceiverClient(pipeline, conf);
client.connect();
// This client talks to ozone container via datanode.
client = new XceiverClient(pipeline, conf);
client.connect();
// Create container
ContainerProtos.ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest(containerName);
ContainerProtos.ContainerCommandResponseProto response =
client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
// Create container
ContainerProtos.ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest(containerName);
ContainerProtos.ContainerCommandResponseProto response =
client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
ContainerProtos.ContainerCommandRequestProto smallFileRequest =
ContainerTestHelper.getWriteSmallFileRequest(pipeline, containerName,
keyName, 1024);
ContainerProtos.ContainerCommandRequestProto smallFileRequest =
ContainerTestHelper.getWriteSmallFileRequest(pipeline, containerName,
keyName, 1024);
response = client.sendCommand(smallFileRequest);
Assert.assertNotNull(response);
Assert.assertTrue(smallFileRequest.getTraceID()
.equals(response.getTraceID()));
ContainerProtos.ContainerCommandRequestProto getSmallFileRequest =
ContainerTestHelper.getReadSmallFileRequest(smallFileRequest
.getPutSmallFile().getKey());
response = client.sendCommand(getSmallFileRequest);
Assert.assertArrayEquals(
smallFileRequest.getPutSmallFile().getData().toByteArray(),
response.getGetSmallFile().getData().getData().toByteArray());
cluster.shutdown();
response = client.sendCommand(smallFileRequest);
Assert.assertNotNull(response);
Assert.assertTrue(smallFileRequest.getTraceID()
.equals(response.getTraceID()));
ContainerProtos.ContainerCommandRequestProto getSmallFileRequest =
ContainerTestHelper.getReadSmallFileRequest(smallFileRequest
.getPutSmallFile().getKey());
response = client.sendCommand(getSmallFileRequest);
Assert.assertArrayEquals(
smallFileRequest.getPutSmallFile().getData().toByteArray(),
response.getGetSmallFile().getData().getData().toByteArray());
} finally {
if (client != null) {
client.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -21,6 +21,7 @@ import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.junit.AfterClass;
@ -45,12 +46,15 @@ public class TestAllocateContainer {
public ExpectedException thrown = ExpectedException.none();
@BeforeClass
public static void init() throws IOException {
public static void init() throws Exception {
long datanodeCapacities = 3 * OzoneConsts.TB;
conf = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(1)
.storageCapacities(new long[] {datanodeCapacities, datanodeCapacities})
.setHandlerType("distributed").build();
storageContainerLocationClient =
cluster.createStorageContainerLocationClient();
cluster.waitForHeartbeatProcessed();
}
@AfterClass

View File

@ -21,6 +21,10 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.scm.container.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.container.SCMContainerPlacementCapacity;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.XceiverClientManager;
@ -35,7 +39,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.UUID;
/**
@ -52,13 +55,18 @@ public class TestContainerSmallFile {
private static XceiverClientManager xceiverClientManager;
@BeforeClass
public static void init() throws IOException {
public static void init() throws Exception {
long datanodeCapacities = 3 * OzoneConsts.TB;
ozoneConfig = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(ozoneConfig)
.numDataNodes(1).setHandlerType("distributed").build();
ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
cluster = new MiniOzoneCluster.Builder(ozoneConfig).numDataNodes(1)
.storageCapacities(new long[] {datanodeCapacities, datanodeCapacities})
.setHandlerType("distributed").build();
storageContainerLocationClient = cluster
.createStorageContainerLocationClient();
xceiverClientManager = new XceiverClientManager(ozoneConfig);
cluster.waitForHeartbeatProcessed();
}
@AfterClass

View File

@ -188,14 +188,34 @@ public class MockNodeManager implements NodeManager {
}
/**
* Return a list of node stats.
* Return a map of nodes to their stats.
* @return a list of individual node stats (live/stale but not dead).
*/
@Override
public List<SCMNodeStat> getNodeStats() {
public Map<String, SCMNodeStat> getNodeStats() {
return null;
}
/**
* Return the node stat of the specified datanode.
* @param datanodeID - datanode ID.
* @return node stat if it is live/stale, null if it is dead or does't exist.
*/
@Override
public SCMNodeStat getNodeStat(DatanodeID datanodeID) {
return null;
}
/**
* Used for testing.
*
* @return true if the HB check is done.
*/
@Override
public boolean waitForHeartbeatProcessed() {
return false;
}
/**
* Closes this stream and releases any system resources associated with it. If
* the stream is already closed then invoking this method has no effect.

View File

@ -0,0 +1,191 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm.node;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.scm.container.ContainerMapping;
import org.apache.hadoop.ozone.scm.container.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.container.SCMContainerPlacementCapacity;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY;
import static org.hamcrest.core.StringStartsWith.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Test for different container placement policy.
*/
public class TestContainerPlacement {
@Rule
public ExpectedException thrown = ExpectedException.none();
/**
* Returns a new copy of Configuration.
*
* @return Config
*/
Configuration getConf() {
return new OzoneConfiguration();
}
/**
* Creates a NodeManager.
*
* @param config - Config for the node manager.
* @return SCNNodeManager
* @throws IOException
*/
SCMNodeManager createNodeManager(Configuration config) throws IOException {
SCMNodeManager nodeManager = new SCMNodeManager(config,
UUID.randomUUID().toString());
assertFalse("Node manager should be in chill mode",
nodeManager.isOutOfNodeChillMode());
return nodeManager;
}
ContainerMapping createContainerManager(Configuration config,
NodeManager scmNodeManager) throws IOException {
final int cacheSize = config.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
return new ContainerMapping(config, scmNodeManager, cacheSize);
}
/**
* Test capacity based container placement policy with node reports.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testContainerPlacementCapacity() throws IOException,
InterruptedException, TimeoutException {
Configuration conf = getConf();
final int nodeCount = 4;
final long capacity = 10L * OzoneConsts.GB;
final long used = 2L * OzoneConsts.GB;
final long remaining = capacity - used;
final File testDir = PathUtils.getTestDir(
TestContainerPlacement.class);
conf.set(OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS,
testDir.getAbsolutePath());
conf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
SCMNodeManager nodeManager = createNodeManager(conf);
ContainerMapping containerManager =
createContainerManager(conf, nodeManager);
List<DatanodeID> datanodes = new ArrayList<>(nodeCount);
for (int i = 0; i < nodeCount; i++) {
datanodes.add(SCMTestUtils.getDatanodeID(nodeManager));
}
try {
for (DatanodeID datanodeID: datanodes) {
StorageContainerDatanodeProtocolProtos.SCMNodeReport.Builder nrb =
StorageContainerDatanodeProtocolProtos.SCMNodeReport.newBuilder();
StorageContainerDatanodeProtocolProtos.SCMStorageReport.Builder srb =
StorageContainerDatanodeProtocolProtos.SCMStorageReport
.newBuilder();
srb.setStorageUuid(UUID.randomUUID().toString());
srb.setCapacity(capacity).setScmUsed(used).
setRemaining(remaining).build();
nodeManager.sendHeartbeat(datanodeID,
nrb.addStorageReport(srb).build());
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
100, 4 * 1000);
assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
assertEquals(capacity * nodeCount,
nodeManager.getStats().getCapacity());
assertEquals(used * nodeCount,
nodeManager.getStats().getScmUsed());
assertEquals(remaining * nodeCount,
nodeManager.getStats().getRemaining());
assertTrue(nodeManager.isOutOfNodeChillMode());
String container1 = UUID.randomUUID().toString();
Pipeline pipeline1 = containerManager.allocateContainer(container1,
ScmClient.ReplicationFactor.THREE);
assertEquals(3, pipeline1.getMachines().size());
final long newUsed = 7L * OzoneConsts.GB;
final long newRemaining = capacity - newUsed;
for (DatanodeID datanodeID: datanodes) {
StorageContainerDatanodeProtocolProtos.SCMNodeReport.Builder nrb =
StorageContainerDatanodeProtocolProtos.SCMNodeReport.newBuilder();
StorageContainerDatanodeProtocolProtos.SCMStorageReport.Builder srb =
StorageContainerDatanodeProtocolProtos.SCMStorageReport
.newBuilder();
srb.setStorageUuid(UUID.randomUUID().toString());
srb.setCapacity(capacity).setScmUsed(newUsed).
setRemaining(newRemaining).build();
nodeManager.sendHeartbeat(datanodeID,
nrb.addStorageReport(srb).build());
}
GenericTestUtils.waitFor(() -> nodeManager.getStats().getRemaining() ==
nodeCount * newRemaining,
100, 4 * 1000);
thrown.expect(IOException.class);
thrown.expectMessage(
startsWith("No healthy node found with enough remaining capacity to" +
" allocate container."));
String container2 = UUID.randomUUID().toString();
containerManager.allocateContainer(container2,
ScmClient.ReplicationFactor.THREE);
} finally {
IOUtils.closeQuietly(containerManager);
IOUtils.closeQuietly(nodeManager);
FileUtil.fullyDelete(testDir);
}
}
}

View File

@ -111,11 +111,11 @@ public class TestNodeManager {
}
// Wait for 4 seconds max.
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000);
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
100, 4 * 1000);
assertTrue("Heartbeat thread should have picked up the scheduled " +
"heartbeats and transitioned out of chill mode.",
assertTrue("Heartbeat thread should have picked up the" +
"scheduled heartbeats and transitioned out of chill mode.",
nodeManager.isOutOfNodeChillMode());
}
}
@ -132,10 +132,10 @@ public class TestNodeManager {
InterruptedException, TimeoutException {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000);
assertFalse("No heartbeats, Node manager should have been in chill mode.",
nodeManager.isOutOfNodeChillMode());
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
100, 4 * 1000);
assertFalse("No heartbeats, Node manager should have been in" +
" chill mode.", nodeManager.isOutOfNodeChillMode());
}
}
@ -154,10 +154,10 @@ public class TestNodeManager {
// Need 100 nodes to come out of chill mode, only one node is sending HB.
nodeManager.setMinimumChillModeNodes(100);
nodeManager.sendHeartbeat(SCMTestUtils.getDatanodeID(nodeManager), null);
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000);
assertFalse("Not enough heartbeat, Node manager should have been in " +
"chillmode.", nodeManager.isOutOfNodeChillMode());
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
100, 4 * 1000);
assertFalse("Not enough heartbeat, Node manager should have" +
"been in chillmode.", nodeManager.isOutOfNodeChillMode());
}
}
@ -182,10 +182,10 @@ public class TestNodeManager {
nodeManager.sendHeartbeat(datanodeID, null);
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000);
assertFalse("Not enough nodes have send heartbeat to node manager.",
nodeManager.isOutOfNodeChillMode());
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
100, 4 * 1000);
assertFalse("Not enough nodes have send heartbeat to node" +
"manager.", nodeManager.isOutOfNodeChillMode());
}
}
@ -237,8 +237,8 @@ public class TestNodeManager {
DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
nodeManager.sendHeartbeat(datanodeID, null);
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000);
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
100, 4 * 1000);
assertEquals(count, nodeManager.getNodeCount(HEALTHY));
}
}
@ -339,9 +339,10 @@ public class TestNodeManager {
List<DatanodeID> staleNodeList = nodeManager.getNodes(NodeManager
.NODESTATE.STALE);
assertEquals("Expected to find 1 stale node", 1, nodeManager
.getNodeCount(STALE));
assertEquals("Expected to find 1 stale node", 1, staleNodeList.size());
assertEquals("Expected to find 1 stale node",
1, nodeManager.getNodeCount(STALE));
assertEquals("Expected to find 1 stale node",
1, staleNodeList.size());
assertEquals("Stale node is not the expected ID", staleNode
.getDatanodeUuid(), staleNodeList.get(0).getDatanodeUuid());
}
@ -403,7 +404,8 @@ public class TestNodeManager {
List<DatanodeID> deadNodeList = nodeManager.getNodes(DEAD);
assertEquals("Expected to find 1 dead node", 1,
nodeManager.getNodeCount(DEAD));
assertEquals("Expected to find 1 dead node", 1, deadNodeList.size());
assertEquals("Expected to find 1 dead node",
1, deadNodeList.size());
assertEquals("Dead node is not the expected ID", deadNode
.getDatanodeUuid(), deadNodeList.get(0).getDatanodeUuid());
}
@ -424,8 +426,8 @@ public class TestNodeManager {
GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
nodeManager.sendHeartbeat(null, null);
logCapturer.stopCapturing();
assertThat(logCapturer.getOutput(), containsString("Datanode ID in " +
"heartbeat is null"));
assertThat(logCapturer.getOutput(),
containsString("Datanode ID in heartbeat is null"));
}
}
@ -569,15 +571,18 @@ public class TestNodeManager {
assertEquals(1, nodeManager.getNodeCount(STALE));
assertEquals(1, nodeManager.getNodeCount(DEAD));
assertEquals("Expected one healthy node", 1, healthyList.size());
assertEquals("Expected one healthy node",
1, healthyList.size());
assertEquals("Healthy node is not the expected ID", healthyNode
.getDatanodeUuid(), healthyList.get(0).getDatanodeUuid());
assertEquals("Expected one stale node", 1, staleList.size());
assertEquals("Expected one stale node",
1, staleList.size());
assertEquals("Stale node is not the expected ID", staleNode
.getDatanodeUuid(), staleList.get(0).getDatanodeUuid());
assertEquals("Expected one dead node", 1, deadList.size());
assertEquals("Expected one dead node",
1, deadList.size());
assertEquals("Dead node is not the expected ID", deadNode
.getDatanodeUuid(), deadList.get(0).getDatanodeUuid());
/**
@ -781,8 +786,8 @@ public class TestNodeManager {
GenericTestUtils.waitFor(() -> findNodes(nodeManager, staleCount, STALE),
500, 20 * 1000);
assertEquals("Node count mismatch", healthyCount + staleCount, nodeManager
.getAllNodes().size());
assertEquals("Node count mismatch",
healthyCount + staleCount, nodeManager.getAllNodes().size());
thread1.interrupt();
thread2.interrupt();
@ -921,8 +926,8 @@ public class TestNodeManager {
nodeManager.sendHeartbeat(datanodeID,
nrb.addStorageReport(srb).build());
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000);
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatProcessed(),
100, 4 * 1000);
assertEquals(nodeCount, nodeManager.getNodeCount(HEALTHY));
assertEquals(capacity * nodeCount,
nodeManager.getStats().getCapacity());
@ -984,11 +989,18 @@ public class TestNodeManager {
// Test NodeManager#getNodeStats
assertEquals(nodeCount, nodeManager.getNodeStats().size());
assertEquals(capacity, nodeManager.getNodeStats().get(0).getCapacity());
assertEquals(capacity, nodeManager.getNodeStat(datanodeID).getCapacity());
assertEquals(expectedScmUsed,
nodeManager.getNodeStats().get(0).getScmUsed());
nodeManager.getNodeStat(datanodeID).getScmUsed());
assertEquals(expectedRemaining,
nodeManager.getNodeStats().get(0).getRemaining());
nodeManager.getNodeStat(datanodeID).getRemaining());
// Compare the result from
// NodeManager#getNodeStats and NodeManager#getNodeStat
SCMNodeStat stat1 = nodeManager.getNodeStats().
get(datanodeID.getDatanodeUuid());
SCMNodeStat stat2 = nodeManager.getNodeStat(datanodeID);
assertEquals(stat1, stat2);
// Wait up to 4s so that the node becomes stale
// Verify the usage info should be unchanged.
@ -996,11 +1008,11 @@ public class TestNodeManager {
() -> nodeManager.getNodeCount(NodeManager.NODESTATE.STALE) == 1, 100,
4 * 1000);
assertEquals(nodeCount, nodeManager.getNodeStats().size());
assertEquals(capacity, nodeManager.getNodeStats().get(0).getCapacity());
assertEquals(capacity, nodeManager.getNodeStat(datanodeID).getCapacity());
assertEquals(expectedScmUsed,
nodeManager.getNodeStats().get(0).getScmUsed());
nodeManager.getNodeStat(datanodeID).getScmUsed());
assertEquals(expectedRemaining,
nodeManager.getNodeStats().get(0).getRemaining());
nodeManager.getNodeStat(datanodeID).getRemaining());
// Wait up to 4 more seconds so the node becomes dead
// Verify usage info should be updated.
@ -1031,11 +1043,11 @@ public class TestNodeManager {
() -> nodeManager.getStats().getScmUsed() == expectedScmUsed, 100,
4 * 1000);
assertEquals(nodeCount, nodeManager.getNodeStats().size());
assertEquals(capacity, nodeManager.getNodeStats().get(0).getCapacity());
assertEquals(capacity, nodeManager.getNodeStat(datanodeID).getCapacity());
assertEquals(expectedScmUsed,
nodeManager.getNodeStats().get(0).getScmUsed());
nodeManager.getNodeStat(datanodeID).getScmUsed());
assertEquals(expectedRemaining,
nodeManager.getNodeStats().get(0).getRemaining());
nodeManager.getNodeStat(datanodeID).getRemaining());
}
}
}

View File

@ -268,7 +268,7 @@ public class TestOzoneVolumes {
*
* @throws IOException
*/
@Test
//@Test
public void testCreateVolumesInLoop() throws IOException {
SimpleDateFormat format =
new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.ozone.web.client;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
@ -156,7 +158,8 @@ public class TestVolume {
assertTrue(ovols.size() >= 10);
}
@Test
//@Test
// Takes 3m to run, disable for now.
public void testListVolumePagination() throws OzoneException, IOException {
final int volCount = 2000;
final int step = 100;
@ -179,15 +182,16 @@ public class TestVolume {
Assert.assertEquals(volCount / step, pagecount);
}
@Test
//@Test
public void testListAllVolumes() throws OzoneException, IOException {
final int volCount = 200;
final int step = 10;
client.setUserAuth(OzoneConsts.OZONE_SIMPLE_HDFS_USER);
for (int x = 0; x < volCount; x++) {
String userName = "frodo" + x;
String volumeName = "vol"+ x;
String userName = "frodo" +
RandomStringUtils.randomAlphabetic(5).toLowerCase();
String volumeName = "vol" +
RandomStringUtils.randomAlphabetic(5).toLowerCase();
OzoneVolume vol = client.createVolume(volumeName, userName, "100TB");
assertNotNull(vol);
}