HDFS-11133. Ozone: Add allocateContainer RPC. Contributed by Anu Engineer
This commit is contained in:
parent
ca12aac5a4
commit
48db56adea
|
@ -52,6 +52,22 @@ public class LevelDBStore {
|
||||||
this.dbFile = dbPath;
|
this.dbFile = dbPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Opens a DB file.
|
||||||
|
*
|
||||||
|
* @param dbPath - DB File path
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public LevelDBStore(File dbPath, Options options)
|
||||||
|
throws IOException {
|
||||||
|
db = JniDBFactory.factory.open(dbPath, options);
|
||||||
|
if (db == null) {
|
||||||
|
throw new IOException("Db is null");
|
||||||
|
}
|
||||||
|
this.dbFile = dbPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Puts a Key into file.
|
* Puts a Key into file.
|
||||||
*
|
*
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ContainerLocationProtocol is used by an HDFS node to find the set of nodes
|
* ContainerLocationProtocol is used by an HDFS node to find the set of nodes
|
||||||
|
@ -41,4 +42,13 @@ public interface StorageContainerLocationProtocol {
|
||||||
*/
|
*/
|
||||||
Set<LocatedContainer> getStorageContainerLocations(Set<String> keys)
|
Set<LocatedContainer> getStorageContainerLocations(Set<String> keys)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asks SCM where a container should be allocated. SCM responds with the
|
||||||
|
* set of datanodes that should be used creating this container.
|
||||||
|
* @param containerName - Name of the container.
|
||||||
|
* @return Pipeline.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
Pipeline allocateContainer(String containerName) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,42 +1,44 @@
|
||||||
/**
|
/**
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
* distributed with this work for additional information
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
* to you under the Apache License, Version 2.0 (the
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
* "License"); you may not use this file except in compliance
|
* You may obtain a copy of the License at
|
||||||
* with the License. You may obtain a copy of the License at
|
* <p>
|
||||||
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* <p>
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
* See the License for the specific language governing permissions and
|
* License for the specific language governing permissions and limitations under
|
||||||
* limitations under the License.
|
* the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.protocolPB;
|
package org.apache.hadoop.ozone.protocolPB;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import com.google.common.base.Preconditions;
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.google.protobuf.RpcController;
|
import com.google.protobuf.RpcController;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||||
import org.apache.hadoop.ipc.ProtocolTranslator;
|
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ozone.protocol.LocatedContainer;
|
import org.apache.hadoop.ozone.protocol.LocatedContainer;
|
||||||
import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol;
|
import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsRequestProto;
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsRequestProto;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsResponseProto;
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsResponseProto;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.LocatedContainerProto;
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.LocatedContainerProto;
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is the client-side translator to translate the requests made on
|
* This class is the client-side translator to translate the requests made on
|
||||||
|
@ -47,7 +49,9 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolPr
|
||||||
public final class StorageContainerLocationProtocolClientSideTranslatorPB
|
public final class StorageContainerLocationProtocolClientSideTranslatorPB
|
||||||
implements StorageContainerLocationProtocol, ProtocolTranslator, Closeable {
|
implements StorageContainerLocationProtocol, ProtocolTranslator, Closeable {
|
||||||
|
|
||||||
/** RpcController is not used and hence is set to null. */
|
/**
|
||||||
|
* RpcController is not used and hence is set to null.
|
||||||
|
*/
|
||||||
private static final RpcController NULL_RPC_CONTROLLER = null;
|
private static final RpcController NULL_RPC_CONTROLLER = null;
|
||||||
|
|
||||||
private final StorageContainerLocationProtocolPB rpcProxy;
|
private final StorageContainerLocationProtocolPB rpcProxy;
|
||||||
|
@ -67,7 +71,7 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
|
||||||
throws IOException {
|
throws IOException {
|
||||||
GetStorageContainerLocationsRequestProto.Builder req =
|
GetStorageContainerLocationsRequestProto.Builder req =
|
||||||
GetStorageContainerLocationsRequestProto.newBuilder();
|
GetStorageContainerLocationsRequestProto.newBuilder();
|
||||||
for (String key: keys) {
|
for (String key : keys) {
|
||||||
req.addKeys(key);
|
req.addKeys(key);
|
||||||
}
|
}
|
||||||
final GetStorageContainerLocationsResponseProto resp;
|
final GetStorageContainerLocationsResponseProto resp;
|
||||||
|
@ -79,11 +83,11 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
|
||||||
}
|
}
|
||||||
Set<LocatedContainer> locatedContainers =
|
Set<LocatedContainer> locatedContainers =
|
||||||
Sets.newLinkedHashSetWithExpectedSize(resp.getLocatedContainersCount());
|
Sets.newLinkedHashSetWithExpectedSize(resp.getLocatedContainersCount());
|
||||||
for (LocatedContainerProto locatedContainer:
|
for (LocatedContainerProto locatedContainer :
|
||||||
resp.getLocatedContainersList()) {
|
resp.getLocatedContainersList()) {
|
||||||
Set<DatanodeInfo> locations = Sets.newLinkedHashSetWithExpectedSize(
|
Set<DatanodeInfo> locations = Sets.newLinkedHashSetWithExpectedSize(
|
||||||
locatedContainer.getLocationsCount());
|
locatedContainer.getLocationsCount());
|
||||||
for (DatanodeInfoProto location: locatedContainer.getLocationsList()) {
|
for (DatanodeInfoProto location : locatedContainer.getLocationsList()) {
|
||||||
locations.add(PBHelperClient.convert(location));
|
locations.add(PBHelperClient.convert(location));
|
||||||
}
|
}
|
||||||
locatedContainers.add(new LocatedContainer(locatedContainer.getKey(),
|
locatedContainers.add(new LocatedContainer(locatedContainer.getKey(),
|
||||||
|
@ -94,6 +98,37 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
|
||||||
return locatedContainers;
|
return locatedContainers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asks SCM where a container should be allocated. SCM responds with the set
|
||||||
|
* of datanodes that should be used creating this container.
|
||||||
|
*
|
||||||
|
* @param containerName - Name of the container.
|
||||||
|
* @return Pipeline.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Pipeline allocateContainer(String containerName) throws IOException {
|
||||||
|
|
||||||
|
Preconditions.checkNotNull(containerName, "Container Name cannot be Null");
|
||||||
|
Preconditions.checkState(!containerName.isEmpty(), "Container name cannot" +
|
||||||
|
" be empty");
|
||||||
|
|
||||||
|
ContainerRequestProto request = ContainerRequestProto.newBuilder()
|
||||||
|
.setContainerName(containerName).build();
|
||||||
|
|
||||||
|
final ContainerResponseProto response;
|
||||||
|
try {
|
||||||
|
response = rpcProxy.allocateContainer(NULL_RPC_CONTROLLER, request);
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
if (response.getErrorCode() != ContainerResponseProto.Error.success) {
|
||||||
|
throw new IOException(response.hasErrorMessage() ?
|
||||||
|
response.getErrorMessage() : "Allocate container failed.");
|
||||||
|
}
|
||||||
|
return Pipeline.getFromProtoBuf(response.getPipeline());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object getUnderlyingProxyObject() {
|
public Object getUnderlyingProxyObject() {
|
||||||
return rpcProxy;
|
return rpcProxy;
|
||||||
|
|
|
@ -29,9 +29,12 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||||
import org.apache.hadoop.ozone.protocol.LocatedContainer;
|
import org.apache.hadoop.ozone.protocol.LocatedContainer;
|
||||||
import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol;
|
import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.StorageContainerLocationProtocolProtos;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsRequestProto;
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsRequestProto;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsResponseProto;
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsResponseProto;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.LocatedContainerProto;
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.LocatedContainerProto;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is the server-side translator that forwards requests received on
|
* This class is the server-side translator that forwards requests received on
|
||||||
|
@ -86,4 +89,11 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
|
||||||
}
|
}
|
||||||
return resp.build();
|
return resp.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ContainerResponseProto allocateContainer(RpcController unused,
|
||||||
|
StorageContainerLocationProtocolProtos.ContainerRequestProto request)
|
||||||
|
throws ServiceException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,199 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.scm.container;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
|
import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
|
||||||
|
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
import org.iq80.leveldb.Options;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mapping class contains the mapping from a name to a pipeline mapping. This is
|
||||||
|
* used by SCM when allocating new locations and when looking up a key.
|
||||||
|
*/
|
||||||
|
public class ContainerMapping implements Mapping {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(ContainerMapping.class);
|
||||||
|
|
||||||
|
private final NodeManager nodeManager;
|
||||||
|
private final int cacheSize;
|
||||||
|
private final Lock lock;
|
||||||
|
private final Charset encoding = Charset.forName("UTF-8");
|
||||||
|
private final LevelDBStore store;
|
||||||
|
private final Random rand;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a mapping class that creates mapping between container names and
|
||||||
|
* pipelines.
|
||||||
|
*
|
||||||
|
* @param nodeManager - NodeManager so that we can get the nodes that are
|
||||||
|
* healthy to place new containers.
|
||||||
|
* @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache
|
||||||
|
* its nodes. This is passed to LevelDB and this memory is allocated in Native
|
||||||
|
* code space. CacheSize is specified in MB.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public ContainerMapping(Configuration conf, NodeManager nodeManager,
|
||||||
|
int cacheSizeMB) throws IOException {
|
||||||
|
this.nodeManager = nodeManager;
|
||||||
|
this.cacheSize = cacheSizeMB;
|
||||||
|
|
||||||
|
// TODO: Fix this checking.
|
||||||
|
String scmMetaDataDir = conf.get(OzoneConfigKeys
|
||||||
|
.OZONE_CONTAINER_METADATA_DIRS);
|
||||||
|
if ((scmMetaDataDir == null) || (scmMetaDataDir.isEmpty())) {
|
||||||
|
throw
|
||||||
|
new IllegalArgumentException("SCM metadata directory is not valid.");
|
||||||
|
}
|
||||||
|
File dbPath = new File(scmMetaDataDir, "SCM.db");
|
||||||
|
Options options = new Options();
|
||||||
|
options.cacheSize(this.cacheSize * (1024 * 1024));
|
||||||
|
options.createIfMissing();
|
||||||
|
store = new LevelDBStore(dbPath, options);
|
||||||
|
this.lock = new ReentrantLock();
|
||||||
|
rand = new Random();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Translates a list of nodes, ordered such that the first is the leader, into
|
||||||
|
* a corresponding {@link Pipeline} object.
|
||||||
|
*
|
||||||
|
* @param node datanode on which we will allocate the contianer.
|
||||||
|
* @param containerName container name
|
||||||
|
* @return pipeline corresponding to nodes
|
||||||
|
*/
|
||||||
|
private static Pipeline newPipelineFromNodes(DatanodeID node, String
|
||||||
|
containerName) {
|
||||||
|
Preconditions.checkNotNull(node);
|
||||||
|
String leaderId = node.getDatanodeUuid();
|
||||||
|
Pipeline pipeline = new Pipeline(leaderId);
|
||||||
|
pipeline.addMember(node);
|
||||||
|
pipeline.setContainerName(containerName);
|
||||||
|
return pipeline;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the Pipeline from the container name.
|
||||||
|
*
|
||||||
|
* @param containerName - Name
|
||||||
|
* @return - Pipeline that makes up this container.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Pipeline getContainer(String containerName) throws IOException {
|
||||||
|
Pipeline pipeline = null;
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
byte[] pipelineBytes = store.get(containerName.getBytes(encoding));
|
||||||
|
if (pipelineBytes == null) {
|
||||||
|
throw new IOException("Specified key does not exist. key : " +
|
||||||
|
containerName);
|
||||||
|
}
|
||||||
|
pipeline = Pipeline.getFromProtoBuf(
|
||||||
|
ContainerProtos.Pipeline.PARSER.parseFrom(pipelineBytes));
|
||||||
|
return pipeline;
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocates a new container.
|
||||||
|
*
|
||||||
|
* @param containerName - Name of the container.
|
||||||
|
* @return - Pipeline that makes up this container.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Pipeline allocateContainer(String containerName) throws IOException {
|
||||||
|
Preconditions.checkNotNull(containerName);
|
||||||
|
Preconditions.checkState(!containerName.isEmpty());
|
||||||
|
Pipeline pipeline = null;
|
||||||
|
if (!nodeManager.isOutOfNodeChillMode()) {
|
||||||
|
throw new IOException("Unable to create container while in chill mode");
|
||||||
|
}
|
||||||
|
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
byte[] pipelineBytes = store.get(containerName.getBytes(encoding));
|
||||||
|
if (pipelineBytes != null) {
|
||||||
|
throw new IOException("Specified container already exists. key : " +
|
||||||
|
containerName);
|
||||||
|
}
|
||||||
|
DatanodeID id = getDatanodeID();
|
||||||
|
if (id != null) {
|
||||||
|
pipeline = newPipelineFromNodes(id, containerName);
|
||||||
|
store.put(containerName.getBytes(encoding),
|
||||||
|
pipeline.getProtobufMessage().toByteArray());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
return pipeline;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a random Datanode ID from the list of healthy nodes.
|
||||||
|
*
|
||||||
|
* @return Datanode ID
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private DatanodeID getDatanodeID() throws IOException {
|
||||||
|
List<DatanodeID> healthyNodes =
|
||||||
|
nodeManager.getNodes(NodeManager.NODESTATE.HEALTHY);
|
||||||
|
|
||||||
|
if (healthyNodes.size() == 0) {
|
||||||
|
throw new IOException("No healthy node found to allocate container.");
|
||||||
|
}
|
||||||
|
|
||||||
|
int index = rand.nextInt() % healthyNodes.size();
|
||||||
|
return healthyNodes.get(Math.abs(index));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Closes this stream and releases any system resources associated with it. If
|
||||||
|
* the stream is already closed then invoking this method has no effect.
|
||||||
|
* <p>
|
||||||
|
* <p> As noted in {@link AutoCloseable#close()}, cases where the close may
|
||||||
|
* fail require careful attention. It is strongly advised to relinquish the
|
||||||
|
* underlying resources and to internally <em>mark</em> the {@code Closeable}
|
||||||
|
* as closed, prior to throwing the {@code IOException}.
|
||||||
|
*
|
||||||
|
* @throws IOException if an I/O error occurs
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (store != null) {
|
||||||
|
store.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,47 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p/>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p/>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.scm.container;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mapping class contains the mapping from a name to a pipeline mapping. This is
|
||||||
|
* used by SCM when allocating new locations and when looking up a key.
|
||||||
|
*/
|
||||||
|
public interface Mapping extends Closeable {
|
||||||
|
/**
|
||||||
|
* Returns the Pipeline from the container name.
|
||||||
|
*
|
||||||
|
* @param containerName - Name
|
||||||
|
* @return - Pipeline that makes up this container.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
Pipeline getContainer(String containerName) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocates a new container for a given keyName.
|
||||||
|
*
|
||||||
|
* @param containerName - Name
|
||||||
|
* @return - Pipeline that makes up this container.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
Pipeline allocateContainer(String containerName) throws IOException;
|
||||||
|
}
|
|
@ -0,0 +1,22 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.scm.container;
|
||||||
|
/**
|
||||||
|
* This package contains routines to manage the container location and
|
||||||
|
* mapping inside SCM
|
||||||
|
*/
|
|
@ -199,6 +199,21 @@ public class StorageContainerManager
|
||||||
return locatedContainers;
|
return locatedContainers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asks SCM where a container should be allocated. SCM responds with the set
|
||||||
|
* of datanodes that should be used creating this container.
|
||||||
|
*
|
||||||
|
* @param containerName - Name of the container.
|
||||||
|
* @return Pipeline.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Pipeline allocateContainer(String containerName) throws IOException {
|
||||||
|
// TODO : This whole file will be replaced when we switch over to using
|
||||||
|
// the new protocol. So skipping connecting this code for now.
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DatanodeRegistration registerDatanode(
|
public DatanodeRegistration registerDatanode(
|
||||||
DatanodeRegistration registration) throws IOException {
|
DatanodeRegistration registration) throws IOException {
|
||||||
|
|
|
@ -29,6 +29,7 @@ option java_generate_equals_and_hash = true;
|
||||||
package hadoop.hdfs;
|
package hadoop.hdfs;
|
||||||
|
|
||||||
import "hdfs.proto";
|
import "hdfs.proto";
|
||||||
|
import "DatanodeContainerProtocol.proto";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* keys - batch of object keys to find
|
* keys - batch of object keys to find
|
||||||
|
@ -56,6 +57,27 @@ message LocatedContainerProto {
|
||||||
required DatanodeInfoProto leader = 5;
|
required DatanodeInfoProto leader = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Request send to SCM asking where the container should be created.
|
||||||
|
*/
|
||||||
|
message ContainerRequestProto {
|
||||||
|
required string containerName = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reply from SCM indicating that the container.
|
||||||
|
*/
|
||||||
|
message ContainerResponseProto {
|
||||||
|
enum Error {
|
||||||
|
success = 1;
|
||||||
|
errorContainerAlreadyExists = 2;
|
||||||
|
errorContainerMissing = 3;
|
||||||
|
}
|
||||||
|
required Error errorCode = 1;
|
||||||
|
required hadoop.hdfs.ozone.Pipeline pipeline = 2;
|
||||||
|
optional string errorMessage = 3;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Protocol used from an HDFS node to StorageContainerManager. See the request
|
* Protocol used from an HDFS node to StorageContainerManager. See the request
|
||||||
* and response messages for details of the RPC calls.
|
* and response messages for details of the RPC calls.
|
||||||
|
@ -68,4 +90,10 @@ service StorageContainerLocationProtocolService {
|
||||||
*/
|
*/
|
||||||
rpc getStorageContainerLocations(GetStorageContainerLocationsRequestProto)
|
rpc getStorageContainerLocations(GetStorageContainerLocationsRequestProto)
|
||||||
returns(GetStorageContainerLocationsResponseProto);
|
returns(GetStorageContainerLocationsResponseProto);
|
||||||
|
|
||||||
|
/**
|
||||||
|
Creates a container entry in SCM.
|
||||||
|
*/
|
||||||
|
rpc allocateContainer(ContainerRequestProto) returns (ContainerResponseProto);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,201 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.scm.container;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
||||||
|
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
|
||||||
|
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test Helper for testing container Mapping.
|
||||||
|
*/
|
||||||
|
public class MockNodeManager implements NodeManager {
|
||||||
|
private final List<DatanodeID> healthyNodes;
|
||||||
|
private static final int HEALTHY_NODE_COUNT = 10;
|
||||||
|
private boolean chillmode;
|
||||||
|
|
||||||
|
public MockNodeManager() {
|
||||||
|
this.healthyNodes = new LinkedList<>();
|
||||||
|
for (int x = 0; x < 10; x++) {
|
||||||
|
healthyNodes.add(SCMTestUtils.getDatanodeID());
|
||||||
|
}
|
||||||
|
chillmode = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the chill mode value.
|
||||||
|
* @param chillmode
|
||||||
|
*/
|
||||||
|
public void setChillmode(boolean chillmode) {
|
||||||
|
this.chillmode = chillmode;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes a data node from the management of this Node Manager.
|
||||||
|
*
|
||||||
|
* @param node - DataNode.
|
||||||
|
* @throws UnregisteredNodeException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void removeNode(DatanodeID node) throws UnregisteredNodeException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets all Live Datanodes that is currently communicating with SCM.
|
||||||
|
*
|
||||||
|
* @param nodestate - State of the node
|
||||||
|
* @return List of Datanodes that are Heartbeating SCM.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public List<DatanodeID> getNodes(NODESTATE nodestate) {
|
||||||
|
if (nodestate == NODESTATE.HEALTHY) {
|
||||||
|
return healthyNodes;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the Number of Datanodes that are communicating with SCM.
|
||||||
|
*
|
||||||
|
* @param nodestate - State of the node
|
||||||
|
* @return int -- count
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int getNodeCount(NODESTATE nodestate) {
|
||||||
|
if (nodestate == NODESTATE.HEALTHY) {
|
||||||
|
return HEALTHY_NODE_COUNT;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all datanodes known to SCM.
|
||||||
|
*
|
||||||
|
* @return List of DatanodeIDs known to SCM.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public List<DatanodeID> getAllNodes() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the minimum number of nodes to get out of chill mode.
|
||||||
|
*
|
||||||
|
* @return int
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int getMinimumChillModeNodes() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reports if we have exited out of chill mode by discovering enough nodes.
|
||||||
|
*
|
||||||
|
* @return True if we are out of Node layer chill mode, false otherwise.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean isOutOfNodeChillMode() {
|
||||||
|
return !chillmode;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Chill mode is the period when node manager waits for a minimum configured
|
||||||
|
* number of datanodes to report in. This is called chill mode to indicate the
|
||||||
|
* period before node manager gets into action.
|
||||||
|
* <p>
|
||||||
|
* Forcefully exits the chill mode, even if we have not met the minimum
|
||||||
|
* criteria of the nodes reporting in.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void forceExitChillMode() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Forcefully enters chill mode, even if all minimum node conditions are met.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void forceEnterChillMode() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clears the manual chill mode flag.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void clearChillModeFlag() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a chill mode status string.
|
||||||
|
*
|
||||||
|
* @return String
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public String getChillModeStatus() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the status of manual chill mode flag.
|
||||||
|
*
|
||||||
|
* @return true if forceEnterChillMode has been called, false if
|
||||||
|
* forceExitChillMode or status is not set. eg. clearChillModeFlag.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean isInManualChillMode() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Closes this stream and releases any system resources associated with it. If
|
||||||
|
* the stream is already closed then invoking this method has no effect.
|
||||||
|
* <p>
|
||||||
|
* <p> As noted in {@link AutoCloseable#close()}, cases where the close may
|
||||||
|
* fail require careful attention. It is strongly advised to relinquish the
|
||||||
|
* underlying resources and to internally <em>mark</em> the {@code Closeable}
|
||||||
|
* as closed, prior to throwing the {@code IOException}.
|
||||||
|
*
|
||||||
|
* @throws IOException if an I/O error occurs
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* When an object implementing interface <code>Runnable</code> is used to
|
||||||
|
* create a thread, starting the thread causes the object's <code>run</code>
|
||||||
|
* method to be called in that separately executing thread.
|
||||||
|
* <p>
|
||||||
|
* The general contract of the method <code>run</code> is that it may take any
|
||||||
|
* action whatsoever.
|
||||||
|
*
|
||||||
|
* @see Thread#run()
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,133 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.scm.container;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
|
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.TreeSet;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for Container Mapping.
|
||||||
|
*/
|
||||||
|
public class TestContainerMapping {
|
||||||
|
private static ContainerMapping mapping;
|
||||||
|
private static MockNodeManager nodeManager;
|
||||||
|
@Rule
|
||||||
|
public ExpectedException thrown = ExpectedException.none();
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
Configuration conf = SCMTestUtils.getConf();
|
||||||
|
|
||||||
|
URL p = conf.getClass().getResource("");
|
||||||
|
String path = p.getPath().concat(
|
||||||
|
TestContainerMapping.class.getSimpleName());
|
||||||
|
|
||||||
|
conf.set(OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS, path);
|
||||||
|
File testDir = Paths.get(path).toFile();
|
||||||
|
boolean folderExisted = testDir.exists() || testDir.mkdirs();
|
||||||
|
if (!folderExisted) {
|
||||||
|
throw new IOException("Unable to create test diectory path");
|
||||||
|
}
|
||||||
|
nodeManager = new MockNodeManager();
|
||||||
|
mapping = new ContainerMapping(conf, nodeManager, 128);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void cleanup() throws IOException {
|
||||||
|
mapping.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void clearChillMode() {
|
||||||
|
nodeManager.setChillmode(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testallocateContainer() throws Exception {
|
||||||
|
Pipeline pipeline = mapping.allocateContainer(UUID.randomUUID().toString());
|
||||||
|
Assert.assertNotNull(pipeline);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testallocateContainerDistributesAllocation() throws Exception {
|
||||||
|
/* This is a lame test, we should really be testing something like
|
||||||
|
z-score or make sure that we don't have 3sigma kind of events. Too lazy
|
||||||
|
to write all that code. This test very lamely tests if we have more than
|
||||||
|
5 separate nodes from the list of 10 datanodes that got allocated a
|
||||||
|
container.
|
||||||
|
*/
|
||||||
|
Set<String> pipelineList = new TreeSet<>();
|
||||||
|
for (int x = 0; x < 30; x++) {
|
||||||
|
Pipeline pipeline = mapping.allocateContainer(UUID.randomUUID()
|
||||||
|
.toString());
|
||||||
|
|
||||||
|
Assert.assertNotNull(pipeline);
|
||||||
|
pipelineList.add(pipeline.getLeader().getDatanodeUuid());
|
||||||
|
}
|
||||||
|
Assert.assertTrue(pipelineList.size() > 5);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetContainer() throws IOException {
|
||||||
|
String containerName = UUID.randomUUID().toString();
|
||||||
|
Pipeline pipeline = mapping.allocateContainer(containerName);
|
||||||
|
Assert.assertNotNull(pipeline);
|
||||||
|
Pipeline newPipeline = mapping.getContainer(containerName);
|
||||||
|
Assert.assertEquals(pipeline.getLeader().getDatanodeUuid(),
|
||||||
|
newPipeline.getLeader().getDatanodeUuid());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDuplicateAllocateContainerFails() throws IOException {
|
||||||
|
String containerName = UUID.randomUUID().toString();
|
||||||
|
Pipeline pipeline = mapping.allocateContainer(containerName);
|
||||||
|
Assert.assertNotNull(pipeline);
|
||||||
|
thrown.expectMessage("Specified container already exists.");
|
||||||
|
mapping.allocateContainer(containerName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testgetNoneExistentContainer() throws IOException {
|
||||||
|
String containerName = UUID.randomUUID().toString();
|
||||||
|
thrown.expectMessage("Specified key does not exist.");
|
||||||
|
mapping.getContainer(containerName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChillModeAllocateContainerFails() throws IOException {
|
||||||
|
String containerName = UUID.randomUUID().toString();
|
||||||
|
nodeManager.setChillmode(true);
|
||||||
|
thrown.expectMessage("Unable to create container while in chill mode");
|
||||||
|
mapping.allocateContainer(containerName);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue