From cb6b6b47b6b9d6153d7e39e0fcdd4523b4ce7b17 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Fri, 18 Nov 2016 12:24:41 -0800 Subject: [PATCH] HDFS-11133. Ozone: Add allocateContainer RPC. Contributed by Anu Engineer --- .../container/common/utils/LevelDBStore.java | 16 ++ .../StorageContainerLocationProtocol.java | 10 + ...ocationProtocolClientSideTranslatorPB.java | 83 +++++--- ...ocationProtocolServerSideTranslatorPB.java | 10 + .../ozone/scm/container/ContainerMapping.java | 199 +++++++++++++++++ .../hadoop/ozone/scm/container/Mapping.java | 47 ++++ .../ozone/scm/container/package-info.java | 22 ++ .../storage/StorageContainerManager.java | 15 ++ .../StorageContainerLocationProtocol.proto | 28 +++ .../ozone/scm/container/MockNodeManager.java | 201 ++++++++++++++++++ .../scm/container/TestContainerMapping.java | 133 ++++++++++++ 11 files changed, 740 insertions(+), 24 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/package-info.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/LevelDBStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/LevelDBStore.java index 5b1a903059f..f7ba53cf397 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/LevelDBStore.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/utils/LevelDBStore.java @@ -52,6 +52,22 @@ public class LevelDBStore { this.dbFile = dbPath; } + /** + * Opens a DB file. + * + * @param dbPath - DB File path + * @throws IOException + */ + public LevelDBStore(File dbPath, Options options) + throws IOException { + db = JniDBFactory.factory.open(dbPath, options); + if (db == null) { + throw new IOException("Db is null"); + } + this.dbFile = dbPath; + } + + /** * Puts a Key into file. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerLocationProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerLocationProtocol.java index b3605eedea6..6f51925cd9d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerLocationProtocol.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; /** * ContainerLocationProtocol is used by an HDFS node to find the set of nodes @@ -41,4 +42,13 @@ public interface StorageContainerLocationProtocol { */ Set getStorageContainerLocations(Set keys) throws IOException; + + /** + * Asks SCM where a container should be allocated. SCM responds with the + * set of datanodes that should be used creating this container. + * @param containerName - Name of the container. + * @return Pipeline. + * @throws IOException + */ + Pipeline allocateContainer(String containerName) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 6aa8190a158..c8dec01cf66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -1,42 +1,44 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.hadoop.ozone.protocolPB; -import java.io.Closeable; -import java.io.IOException; -import java.util.Set; - +import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; - import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ozone.protocol.LocatedContainer; import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.LocatedContainerProto; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Set; /** * This class is the client-side translator to translate the requests made on @@ -47,7 +49,9 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolPr public final class StorageContainerLocationProtocolClientSideTranslatorPB implements StorageContainerLocationProtocol, ProtocolTranslator, Closeable { - /** RpcController is not used and hence is set to null. */ + /** + * RpcController is not used and hence is set to null. + */ private static final RpcController NULL_RPC_CONTROLLER = null; private final StorageContainerLocationProtocolPB rpcProxy; @@ -67,7 +71,7 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB throws IOException { GetStorageContainerLocationsRequestProto.Builder req = GetStorageContainerLocationsRequestProto.newBuilder(); - for (String key: keys) { + for (String key : keys) { req.addKeys(key); } final GetStorageContainerLocationsResponseProto resp; @@ -79,11 +83,11 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB } Set locatedContainers = Sets.newLinkedHashSetWithExpectedSize(resp.getLocatedContainersCount()); - for (LocatedContainerProto locatedContainer: + for (LocatedContainerProto locatedContainer : resp.getLocatedContainersList()) { Set locations = Sets.newLinkedHashSetWithExpectedSize( locatedContainer.getLocationsCount()); - for (DatanodeInfoProto location: locatedContainer.getLocationsList()) { + for (DatanodeInfoProto location : locatedContainer.getLocationsList()) { locations.add(PBHelperClient.convert(location)); } locatedContainers.add(new LocatedContainer(locatedContainer.getKey(), @@ -94,6 +98,37 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB return locatedContainers; } + /** + * Asks SCM where a container should be allocated. SCM responds with the set + * of datanodes that should be used creating this container. + * + * @param containerName - Name of the container. + * @return Pipeline. + * @throws IOException + */ + @Override + public Pipeline allocateContainer(String containerName) throws IOException { + + Preconditions.checkNotNull(containerName, "Container Name cannot be Null"); + Preconditions.checkState(!containerName.isEmpty(), "Container name cannot" + + " be empty"); + + ContainerRequestProto request = ContainerRequestProto.newBuilder() + .setContainerName(containerName).build(); + + final ContainerResponseProto response; + try { + response = rpcProxy.allocateContainer(NULL_RPC_CONTROLLER, request); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + if (response.getErrorCode() != ContainerResponseProto.Error.success) { + throw new IOException(response.hasErrorMessage() ? + response.getErrorMessage() : "Allocate container failed."); + } + return Pipeline.getFromProtoBuf(response.getPipeline()); + } + @Override public Object getUnderlyingProxyObject() { return rpcProxy; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java index 9d9707fa039..19eb8a5642a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -29,9 +29,12 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.ozone.protocol.LocatedContainer; import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerLocationProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.LocatedContainerProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto; /** * This class is the server-side translator that forwards requests received on @@ -86,4 +89,11 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB } return resp.build(); } + + @Override + public ContainerResponseProto allocateContainer(RpcController unused, + StorageContainerLocationProtocolProtos.ContainerRequestProto request) + throws ServiceException { + return null; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java new file mode 100644 index 00000000000..61ae3f4c642 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.scm.container; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.utils.LevelDBStore; +import org.apache.hadoop.ozone.scm.node.NodeManager; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Random; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.iq80.leveldb.Options; + +/** + * Mapping class contains the mapping from a name to a pipeline mapping. This is + * used by SCM when allocating new locations and when looking up a key. + */ +public class ContainerMapping implements Mapping { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerMapping.class); + + private final NodeManager nodeManager; + private final int cacheSize; + private final Lock lock; + private final Charset encoding = Charset.forName("UTF-8"); + private final LevelDBStore store; + private final Random rand; + + /** + * Constructs a mapping class that creates mapping between container names and + * pipelines. + * + * @param nodeManager - NodeManager so that we can get the nodes that are + * healthy to place new containers. + * @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache + * its nodes. This is passed to LevelDB and this memory is allocated in Native + * code space. CacheSize is specified in MB. + */ + @SuppressWarnings("unchecked") + public ContainerMapping(Configuration conf, NodeManager nodeManager, + int cacheSizeMB) throws IOException { + this.nodeManager = nodeManager; + this.cacheSize = cacheSizeMB; + + // TODO: Fix this checking. + String scmMetaDataDir = conf.get(OzoneConfigKeys + .OZONE_CONTAINER_METADATA_DIRS); + if ((scmMetaDataDir == null) || (scmMetaDataDir.isEmpty())) { + throw + new IllegalArgumentException("SCM metadata directory is not valid."); + } + File dbPath = new File(scmMetaDataDir, "SCM.db"); + Options options = new Options(); + options.cacheSize(this.cacheSize * (1024 * 1024)); + options.createIfMissing(); + store = new LevelDBStore(dbPath, options); + this.lock = new ReentrantLock(); + rand = new Random(); + } + + /** + * Translates a list of nodes, ordered such that the first is the leader, into + * a corresponding {@link Pipeline} object. + * + * @param node datanode on which we will allocate the contianer. + * @param containerName container name + * @return pipeline corresponding to nodes + */ + private static Pipeline newPipelineFromNodes(DatanodeID node, String + containerName) { + Preconditions.checkNotNull(node); + String leaderId = node.getDatanodeUuid(); + Pipeline pipeline = new Pipeline(leaderId); + pipeline.addMember(node); + pipeline.setContainerName(containerName); + return pipeline; + } + + /** + * Returns the Pipeline from the container name. + * + * @param containerName - Name + * @return - Pipeline that makes up this container. + */ + @Override + public Pipeline getContainer(String containerName) throws IOException { + Pipeline pipeline = null; + lock.lock(); + try { + byte[] pipelineBytes = store.get(containerName.getBytes(encoding)); + if (pipelineBytes == null) { + throw new IOException("Specified key does not exist. key : " + + containerName); + } + pipeline = Pipeline.getFromProtoBuf( + ContainerProtos.Pipeline.PARSER.parseFrom(pipelineBytes)); + return pipeline; + } finally { + lock.unlock(); + } + } + + /** + * Allocates a new container. + * + * @param containerName - Name of the container. + * @return - Pipeline that makes up this container. + * @throws IOException + */ + @Override + public Pipeline allocateContainer(String containerName) throws IOException { + Preconditions.checkNotNull(containerName); + Preconditions.checkState(!containerName.isEmpty()); + Pipeline pipeline = null; + if (!nodeManager.isOutOfNodeChillMode()) { + throw new IOException("Unable to create container while in chill mode"); + } + + lock.lock(); + try { + byte[] pipelineBytes = store.get(containerName.getBytes(encoding)); + if (pipelineBytes != null) { + throw new IOException("Specified container already exists. key : " + + containerName); + } + DatanodeID id = getDatanodeID(); + if (id != null) { + pipeline = newPipelineFromNodes(id, containerName); + store.put(containerName.getBytes(encoding), + pipeline.getProtobufMessage().toByteArray()); + } + } finally { + lock.unlock(); + } + return pipeline; + } + + /** + * Returns a random Datanode ID from the list of healthy nodes. + * + * @return Datanode ID + * @throws IOException + */ + private DatanodeID getDatanodeID() throws IOException { + List healthyNodes = + nodeManager.getNodes(NodeManager.NODESTATE.HEALTHY); + + if (healthyNodes.size() == 0) { + throw new IOException("No healthy node found to allocate container."); + } + + int index = rand.nextInt() % healthyNodes.size(); + return healthyNodes.get(Math.abs(index)); + } + + /** + * Closes this stream and releases any system resources associated with it. If + * the stream is already closed then invoking this method has no effect. + *

+ *

As noted in {@link AutoCloseable#close()}, cases where the close may + * fail require careful attention. It is strongly advised to relinquish the + * underlying resources and to internally mark the {@code Closeable} + * as closed, prior to throwing the {@code IOException}. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + if (store != null) { + store.close(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java new file mode 100644 index 00000000000..ce49fa747e5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.scm.container; + + +import org.apache.hadoop.scm.container.common.helpers.Pipeline; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Mapping class contains the mapping from a name to a pipeline mapping. This is + * used by SCM when allocating new locations and when looking up a key. + */ +public interface Mapping extends Closeable { + /** + * Returns the Pipeline from the container name. + * + * @param containerName - Name + * @return - Pipeline that makes up this container. + * @throws IOException + */ + Pipeline getContainer(String containerName) throws IOException; + + /** + * Allocates a new container for a given keyName. + * + * @param containerName - Name + * @return - Pipeline that makes up this container. + * @throws IOException + */ + Pipeline allocateContainer(String containerName) throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/package-info.java new file mode 100644 index 00000000000..4ac490cb6f5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.scm.container; +/** + * This package contains routines to manage the container location and + * mapping inside SCM + */ \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java index 5ffcef140cf..1974b7a2229 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java @@ -199,6 +199,21 @@ public class StorageContainerManager return locatedContainers; } + /** + * Asks SCM where a container should be allocated. SCM responds with the set + * of datanodes that should be used creating this container. + * + * @param containerName - Name of the container. + * @return Pipeline. + * @throws IOException + */ + @Override + public Pipeline allocateContainer(String containerName) throws IOException { + // TODO : This whole file will be replaced when we switch over to using + // the new protocol. So skipping connecting this code for now. + return null; + } + @Override public DatanodeRegistration registerDatanode( DatanodeRegistration registration) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerLocationProtocol.proto index 6ed326a9a28..5b2fa4974c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerLocationProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerLocationProtocol.proto @@ -29,6 +29,7 @@ option java_generate_equals_and_hash = true; package hadoop.hdfs; import "hdfs.proto"; +import "DatanodeContainerProtocol.proto"; /** * keys - batch of object keys to find @@ -56,6 +57,27 @@ message LocatedContainerProto { required DatanodeInfoProto leader = 5; } +/** +* Request send to SCM asking where the container should be created. +*/ +message ContainerRequestProto { + required string containerName = 1; +} + +/** + * Reply from SCM indicating that the container. + */ +message ContainerResponseProto { + enum Error { + success = 1; + errorContainerAlreadyExists = 2; + errorContainerMissing = 3; + } + required Error errorCode = 1; + required hadoop.hdfs.ozone.Pipeline pipeline = 2; + optional string errorMessage = 3; +} + /** * Protocol used from an HDFS node to StorageContainerManager. See the request * and response messages for details of the RPC calls. @@ -68,4 +90,10 @@ service StorageContainerLocationProtocolService { */ rpc getStorageContainerLocations(GetStorageContainerLocationsRequestProto) returns(GetStorageContainerLocationsResponseProto); + + /** + Creates a container entry in SCM. + */ + rpc allocateContainer(ContainerRequestProto) returns (ContainerResponseProto); + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java new file mode 100644 index 00000000000..925ea89390c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.scm.container; + +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.ozone.scm.node.NodeManager; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +/** + * Test Helper for testing container Mapping. + */ +public class MockNodeManager implements NodeManager { + private final List healthyNodes; + private static final int HEALTHY_NODE_COUNT = 10; + private boolean chillmode; + + public MockNodeManager() { + this.healthyNodes = new LinkedList<>(); + for (int x = 0; x < 10; x++) { + healthyNodes.add(SCMTestUtils.getDatanodeID()); + } + chillmode = false; + } + + /** + * Sets the chill mode value. + * @param chillmode + */ + public void setChillmode(boolean chillmode) { + this.chillmode = chillmode; + } + + /** + * Removes a data node from the management of this Node Manager. + * + * @param node - DataNode. + * @throws UnregisteredNodeException + */ + @Override + public void removeNode(DatanodeID node) throws UnregisteredNodeException { + + } + + /** + * Gets all Live Datanodes that is currently communicating with SCM. + * + * @param nodestate - State of the node + * @return List of Datanodes that are Heartbeating SCM. + */ + @Override + public List getNodes(NODESTATE nodestate) { + if (nodestate == NODESTATE.HEALTHY) { + return healthyNodes; + } + return null; + } + + /** + * Returns the Number of Datanodes that are communicating with SCM. + * + * @param nodestate - State of the node + * @return int -- count + */ + @Override + public int getNodeCount(NODESTATE nodestate) { + if (nodestate == NODESTATE.HEALTHY) { + return HEALTHY_NODE_COUNT; + } + return 0; + } + + /** + * Get all datanodes known to SCM. + * + * @return List of DatanodeIDs known to SCM. + */ + @Override + public List getAllNodes() { + return null; + } + + /** + * Get the minimum number of nodes to get out of chill mode. + * + * @return int + */ + @Override + public int getMinimumChillModeNodes() { + return 0; + } + + /** + * Reports if we have exited out of chill mode by discovering enough nodes. + * + * @return True if we are out of Node layer chill mode, false otherwise. + */ + @Override + public boolean isOutOfNodeChillMode() { + return !chillmode; + } + + /** + * Chill mode is the period when node manager waits for a minimum configured + * number of datanodes to report in. This is called chill mode to indicate the + * period before node manager gets into action. + *

+ * Forcefully exits the chill mode, even if we have not met the minimum + * criteria of the nodes reporting in. + */ + @Override + public void forceExitChillMode() { + + } + + /** + * Forcefully enters chill mode, even if all minimum node conditions are met. + */ + @Override + public void forceEnterChillMode() { + + } + + /** + * Clears the manual chill mode flag. + */ + @Override + public void clearChillModeFlag() { + + } + + /** + * Returns a chill mode status string. + * + * @return String + */ + @Override + public String getChillModeStatus() { + return null; + } + + /** + * Returns the status of manual chill mode flag. + * + * @return true if forceEnterChillMode has been called, false if + * forceExitChillMode or status is not set. eg. clearChillModeFlag. + */ + @Override + public boolean isInManualChillMode() { + return false; + } + + /** + * Closes this stream and releases any system resources associated with it. If + * the stream is already closed then invoking this method has no effect. + *

+ *

As noted in {@link AutoCloseable#close()}, cases where the close may + * fail require careful attention. It is strongly advised to relinquish the + * underlying resources and to internally mark the {@code Closeable} + * as closed, prior to throwing the {@code IOException}. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + + } + + /** + * When an object implementing interface Runnable is used to + * create a thread, starting the thread causes the object's run + * method to be called in that separately executing thread. + *

+ * The general contract of the method run is that it may take any + * action whatsoever. + * + * @see Thread#run() + */ + @Override + public void run() { + + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java new file mode 100644 index 00000000000..9186a2028b8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.scm.container; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Paths; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; + +/** + * Tests for Container Mapping. + */ +public class TestContainerMapping { + private static ContainerMapping mapping; + private static MockNodeManager nodeManager; + @Rule + public ExpectedException thrown = ExpectedException.none(); + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = SCMTestUtils.getConf(); + + URL p = conf.getClass().getResource(""); + String path = p.getPath().concat( + TestContainerMapping.class.getSimpleName()); + + conf.set(OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS, path); + File testDir = Paths.get(path).toFile(); + boolean folderExisted = testDir.exists() || testDir.mkdirs(); + if (!folderExisted) { + throw new IOException("Unable to create test diectory path"); + } + nodeManager = new MockNodeManager(); + mapping = new ContainerMapping(conf, nodeManager, 128); + } + + @AfterClass + public static void cleanup() throws IOException { + mapping.close(); + } + + @Before + public void clearChillMode() { + nodeManager.setChillmode(false); + } + + @Test + public void testallocateContainer() throws Exception { + Pipeline pipeline = mapping.allocateContainer(UUID.randomUUID().toString()); + Assert.assertNotNull(pipeline); + } + + @Test + public void testallocateContainerDistributesAllocation() throws Exception { + /* This is a lame test, we should really be testing something like + z-score or make sure that we don't have 3sigma kind of events. Too lazy + to write all that code. This test very lamely tests if we have more than + 5 separate nodes from the list of 10 datanodes that got allocated a + container. + */ + Set pipelineList = new TreeSet<>(); + for (int x = 0; x < 30; x++) { + Pipeline pipeline = mapping.allocateContainer(UUID.randomUUID() + .toString()); + + Assert.assertNotNull(pipeline); + pipelineList.add(pipeline.getLeader().getDatanodeUuid()); + } + Assert.assertTrue(pipelineList.size() > 5); + } + + @Test + public void testGetContainer() throws IOException { + String containerName = UUID.randomUUID().toString(); + Pipeline pipeline = mapping.allocateContainer(containerName); + Assert.assertNotNull(pipeline); + Pipeline newPipeline = mapping.getContainer(containerName); + Assert.assertEquals(pipeline.getLeader().getDatanodeUuid(), + newPipeline.getLeader().getDatanodeUuid()); + } + + @Test + public void testDuplicateAllocateContainerFails() throws IOException { + String containerName = UUID.randomUUID().toString(); + Pipeline pipeline = mapping.allocateContainer(containerName); + Assert.assertNotNull(pipeline); + thrown.expectMessage("Specified container already exists."); + mapping.allocateContainer(containerName); + } + + @Test + public void testgetNoneExistentContainer() throws IOException { + String containerName = UUID.randomUUID().toString(); + thrown.expectMessage("Specified key does not exist."); + mapping.getContainer(containerName); + } + + @Test + public void testChillModeAllocateContainerFails() throws IOException { + String containerName = UUID.randomUUID().toString(); + nodeManager.setChillmode(true); + thrown.expectMessage("Unable to create container while in chill mode"); + mapping.allocateContainer(containerName); + } +}