HDFS-11184. Ozone: SCM: Make SCM use container protocol. Contributed by Anu Engineer.

This commit is contained in:
Anu Engineer 2017-02-27 12:25:03 -08:00 committed by Owen O'Malley
parent c169dd1306
commit bb410de10c
33 changed files with 1042 additions and 925 deletions

View File

@ -63,7 +63,6 @@ public class Pipeline {
return newPipeline;
}
/** Adds a member to pipeline */
/**
* Adds a member to the pipeline.

View File

@ -27,19 +27,30 @@ import com.google.protobuf.ByteString;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.GetKeyRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.GetKeyResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.PutKeyRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.PutKeyRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.ReadChunkRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.ReadChunkResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.PutSmallFileRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetSmallFileResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetSmallFileRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.WriteChunkRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.PutSmallFileRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.GetSmallFileResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.GetSmallFileRequestProto;
import org.apache.hadoop.scm.XceiverClient;
/**
@ -210,6 +221,33 @@ public final class ContainerProtocolCalls {
validateContainerResponse(response, traceID);
}
/**
* createContainer call that creates a container on the datanode.
* @param client - client
* @param traceID - traceID
* @throws IOException
*/
public static void createContainer(XceiverClient client, String traceID)
throws IOException {
ContainerProtos.CreateContainerRequestProto.Builder createRequest =
ContainerProtos.CreateContainerRequestProto
.newBuilder();
ContainerProtos.ContainerData.Builder containerData = ContainerProtos
.ContainerData.newBuilder();
containerData.setName(client.getPipeline().getContainerName());
createRequest.setPipeline(client.getPipeline().getProtobufMessage());
createRequest.setContainerData(containerData.build());
ContainerCommandRequestProto.Builder request =
ContainerCommandRequestProto.newBuilder();
request.setCmdType(ContainerProtos.Type.CreateContainer);
request.setCreateContainer(createRequest);
request.setTraceID(traceID);
ContainerCommandResponseProto response = client.sendCommand(
request.build());
validateContainerResponse(response, traceID);
}
/**
* Reads the data given the container name and key.
*

View File

@ -321,9 +321,9 @@ public final class OzoneClientUtils {
* @param conf - Ozone Config
* @return - HB interval in seconds.
*/
public static int getScmHeartbeatInterval(Configuration conf) {
return conf.getInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS,
OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT);
public static long getScmHeartbeatInterval(Configuration conf) {
return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS,
OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT, TimeUnit.SECONDS);
}
/**

View File

@ -94,11 +94,6 @@ public final class OzoneConfigKeys {
public static final long OZONE_SCM_STALENODE_INTERVAL_DEFAULT =
OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT * 1000L * 3L;
public static final String OZONE_SCM_CONTAINER_THREADS =
"ozone.scm.container.threads";
public static final int OZONE_SCM_CONTAINER_THREADS_DEFAULT =
Runtime.getRuntime().availableProcessors() * 2;
public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT =
"ozone.scm.heartbeat.rpc-timeout";
public static final long OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT =
@ -142,6 +137,9 @@ public final class OzoneConfigKeys {
public static final String OZONE_SCM_DATANODE_ID = "ozone.scm.datanode.id";
public static final String OZONE_SCM_DB_CACHE_SIZE_MB =
"ozone.scm.db.cache.size.mb";
public static final int OZONE_SCM_DB_CACHE_SIZE_DEFAULT = 128;
/**
* There is no need to instantiate this class.

View File

@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@ -20,7 +20,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
@ -29,10 +28,8 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* State Machine Class.
@ -55,14 +52,13 @@ public class DatanodeStateMachine implements Closeable {
*/
public DatanodeStateMachine(Configuration conf) throws IOException {
this.conf = conf;
executorService = HadoopExecutors.newScheduledThreadPool(
this.conf.getInt(OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS,
OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS_DEFAULT),
executorService = HadoopExecutors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Datanode State Machine Thread - %d").build());
connectionManager = new SCMConnectionManager(conf);
context = new StateContext(this.conf, DatanodeStates.getInitState(), this);
heartbeatFrequency = OzoneClientUtils.getScmHeartbeatInterval(conf);
heartbeatFrequency = TimeUnit.SECONDS.toMillis(
OzoneClientUtils.getScmHeartbeatInterval(conf));
container = new OzoneContainer(conf);
}
@ -84,6 +80,7 @@ public class DatanodeStateMachine implements Closeable {
container.start();
while (context.getState() != DatanodeStates.SHUTDOWN) {
try {
LOG.debug("Executing cycle Number : {}", context.getExecutionCount());
nextHB = Time.monotonicNow() + heartbeatFrequency;
context.execute(executorService, heartbeatFrequency,
TimeUnit.MILLISECONDS);
@ -91,8 +88,8 @@ public class DatanodeStateMachine implements Closeable {
if (now < nextHB) {
Thread.sleep(nextHB - now);
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("Unable to finish the execution", e);
} catch (Exception e) {
LOG.error("Unable to finish the execution.", e);
}
}
}

View File

@ -187,5 +187,13 @@ public class StateContext {
}
}
/**
* Returns the count of the Execution.
* @return long
*/
public long getExecutionCount() {
return stateExecutionCount.get();
}
}

View File

@ -22,22 +22,15 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.statemachine
.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.endpoint
.HeartbeatEndpointTask;
import org.apache.hadoop.ozone.container.common.states.endpoint
.RegisterEndpointTask;
import org.apache.hadoop.ozone.container.common.states.endpoint
.VersionEndpointTask;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.container.common.states.endpoint.HeartbeatEndpointTask;
import org.apache.hadoop.ozone.container.common.states.endpoint.RegisterEndpointTask;
import org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -109,7 +102,7 @@ public class RunningDatanodeState implements DatanodeState {
DatanodeID temp = new DatanodeID(
//TODO : Replace this with proper network and kerberos
// support code.
InetAddress.getLocalHost().getHostAddress().toString(),
InetAddress.getLocalHost().getHostAddress(),
DataNode.getHostName(conf),
UUID.randomUUID().toString(),
0, /** XferPort - SCM does not use this port */
@ -134,6 +127,13 @@ public class RunningDatanodeState implements DatanodeState {
private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
createNewContainerID(Path idPath)
throws IOException {
if(!idPath.getParent().toFile().exists() &&
!idPath.getParent().toFile().mkdirs()) {
LOG.error("Failed to create container ID locations. Path: {}",
idPath.getParent());
throw new IOException("Unable to create container ID directories.");
}
StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto
containerIDProto = StorageContainerDatanodeProtocolProtos
.ContainerNodeIDProto.newBuilder()
@ -213,7 +213,8 @@ public class RunningDatanodeState implements DatanodeState {
ecs.submit(endpointTask);
}
}
//TODO : Cache some of these tasks instead of creating them
//all the time.
private Callable<EndpointStateMachine.EndPointStates>
getEndPointTask(EndpointStateMachine endpoint) {
switch (endpoint.getState()) {

View File

@ -49,7 +49,7 @@ public class VersionEndpointTask implements
rpcEndPoint.lock();
try{
SCMVersionResponseProto versionResponse =
rpcEndPoint.getEndPoint().getVersion();
rpcEndPoint.getEndPoint().getVersion(null);
rpcEndPoint.setVersion(VersionResponse.getFromProtobuf(versionResponse));
EndpointStateMachine.EndPointStates nextState =

View File

@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
@ -34,7 +35,8 @@ public interface StorageContainerDatanodeProtocol {
* Returns SCM version.
* @return Version info.
*/
SCMVersionResponseProto getVersion() throws IOException;
SCMVersionResponseProto getVersion(SCMVersionRequestProto versionRequest)
throws IOException;
/**
* Used by data node to send a Heartbeat.

View File

@ -44,8 +44,8 @@ public class NullCommand extends SCMCommand<NullCmdResponseProto> {
* @return A protobuf message.
*/
@Override
public NullCmdResponseProto getProtoBufMessage() {
return NullCmdResponseProto.newBuilder().build();
public byte[] getProtoBufMessage() {
return NullCmdResponseProto.newBuilder().build().toByteArray();
}
/**

View File

@ -57,7 +57,7 @@ public class RegisteredCommand extends
* @return Type
*/
@Override
Type getType() {
public Type getType() {
return Type.registeredCommand;
}
@ -94,12 +94,12 @@ public class RegisteredCommand extends
* @return A protobuf message.
*/
@Override
SCMRegisteredCmdResponseProto getProtoBufMessage() {
public byte[] getProtoBufMessage() {
return SCMRegisteredCmdResponseProto.newBuilder()
.setClusterID(this.clusterID)
.setDatanodeUUID(this.datanodeUUID)
.setErrorCode(this.error)
.build();
.build().toByteArray();
}
/**

View File

@ -31,11 +31,11 @@ public abstract class SCMCommand<T extends GeneratedMessage> {
* Returns the type of this command.
* @return Type
*/
abstract Type getType();
public abstract Type getType();
/**
* Gets the protobuf message of this object.
* @return A protobuf message.
*/
abstract T getProtoBufMessage();
public abstract byte[] getProtoBufMessage();
}

View File

@ -92,11 +92,12 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
/**
* Returns SCM version.
*
* @param unused - set to null and unused.
* @return Version info.
*/
@Override
public SCMVersionResponseProto getVersion() throws IOException {
public SCMVersionResponseProto getVersion(SCMVersionRequestProto
unused) throws IOException {
SCMVersionRequestProto request =
SCMVersionRequestProto.newBuilder().build();
final SCMVersionResponseProto response;

View File

@ -47,7 +47,7 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto request)
throws ServiceException {
try {
return impl.getVersion();
return impl.getVersion(request);
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -31,10 +31,18 @@ import org.apache.hadoop.ozone.protocol.LocatedContainer;
import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetStorageContainerLocationsResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.LocatedContainerProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos
.GetStorageContainerLocationsRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos
.GetStorageContainerLocationsResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.LocatedContainerProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.ContainerResponseProto;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
/**
* This class is the server-side translator that forwards requests received on
@ -94,6 +102,15 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
public ContainerResponseProto allocateContainer(RpcController unused,
StorageContainerLocationProtocolProtos.ContainerRequestProto request)
throws ServiceException {
return null;
try {
Pipeline pipeline = impl.allocateContainer(request.getContainerName());
return ContainerResponseProto.newBuilder()
.setPipeline(pipeline.getProtobufMessage())
.setErrorCode(ContainerResponseProto.Error.success)
.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -0,0 +1,442 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.protocol.LocatedContainer;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.commands.NullCommand;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.NullCmdResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeAddressList;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.Type;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
import org.apache.hadoop.ozone.protocolPB
.StorageContainerDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.ozone.protocolPB
.StorageContainerLocationProtocolServerSideTranslatorPB;
import org.apache.hadoop.ozone.scm.container.ContainerMapping;
import org.apache.hadoop.ozone.scm.container.Mapping;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_SCM_CLIENT_ADDRESS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_SCM_DATANODE_ADDRESS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_SCM_DB_CACHE_SIZE_MB;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_SCM_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_SCM_HANDLER_COUNT_KEY;
import static org.apache.hadoop.util.ExitUtil.terminate;
/**
* StorageContainerManager is the main entry point for the service that provides
* information about which SCM nodes host containers.
*
* DataNodes report to StorageContainerManager using heartbeat
* messages. SCM allocates containers and returns a pipeline.
*
* A client once it gets a pipeline (a list of datanodes) will connect to the
* datanodes and create a container, which then can be used to store data.
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
public class StorageContainerManager
implements StorageContainerDatanodeProtocol,
StorageContainerLocationProtocol {
private static final Logger LOG =
LoggerFactory.getLogger(StorageContainerManager.class);
/**
* NodeManager and container Managers for SCM.
*/
private final NodeManager scmNodeManager;
private final Mapping scmContainerManager;
/** The RPC server that listens to requests from DataNodes. */
private final RPC.Server datanodeRpcServer;
private final InetSocketAddress datanodeRpcAddress;
/** The RPC server that listens to requests from clients. */
private final RPC.Server clientRpcServer;
private final InetSocketAddress clientRpcAddress;
/**
* Creates a new StorageContainerManager. Configuration will be updated with
* information on the actual listening addresses used for RPC servers.
*
* @param conf configuration
*/
public StorageContainerManager(OzoneConfiguration conf)
throws IOException {
final int handlerCount = conf.getInt(
OZONE_SCM_HANDLER_COUNT_KEY, OZONE_SCM_HANDLER_COUNT_DEFAULT);
final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
// TODO : Fix the ClusterID generation code.
scmNodeManager = new SCMNodeManager(conf, UUID.randomUUID().toString());
scmContainerManager = new ContainerMapping(conf, scmNodeManager, cacheSize);
RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
ProtobufRpcEngine.class);
BlockingService dnProtoPbService = StorageContainerDatanodeProtocolProtos.
StorageContainerDatanodeProtocolService.newReflectiveBlockingService(
new StorageContainerDatanodeProtocolServerSideTranslatorPB(this));
final InetSocketAddress datanodeRpcAddr =
OzoneClientUtils.getScmDataNodeBindAddress(conf);
datanodeRpcServer = startRpcServer(conf, datanodeRpcAddr,
StorageContainerDatanodeProtocolPB.class, dnProtoPbService,
handlerCount);
datanodeRpcAddress = updateListenAddress(conf,
OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer);
BlockingService storageProtoPbService =
StorageContainerLocationProtocolProtos
.StorageContainerLocationProtocolService
.newReflectiveBlockingService(
new StorageContainerLocationProtocolServerSideTranslatorPB(
this));
final InetSocketAddress scmAddress =
OzoneClientUtils.getScmClientBindAddress(conf);
clientRpcServer = startRpcServer(conf, scmAddress,
StorageContainerLocationProtocolPB.class, storageProtoPbService,
handlerCount);
clientRpcAddress = updateListenAddress(conf,
OZONE_SCM_CLIENT_ADDRESS_KEY, scmAddress, clientRpcServer);
}
/**
* Builds a message for logging startup information about an RPC server.
*
* @param description RPC server description
* @param addr RPC server listening address
* @return server startup message
*/
private static String buildRpcServerStartMessage(String description,
InetSocketAddress addr) {
return addr != null ? String.format("%s is listening at %s",
description, addr.toString()) :
String.format("%s not started", description);
}
/**
* Starts an RPC server, if configured.
*
* @param conf configuration
* @param addr configured address of RPC server
* @param protocol RPC protocol provided by RPC server
* @param instance RPC protocol implementation instance
* @param handlerCount RPC server handler count
*
* @return RPC server
* @throws IOException if there is an I/O error while creating RPC server
*/
private static RPC.Server startRpcServer(OzoneConfiguration conf,
InetSocketAddress addr, Class<?> protocol, BlockingService instance,
int handlerCount)
throws IOException {
RPC.Server rpcServer = new RPC.Builder(conf)
.setProtocol(protocol)
.setInstance(instance)
.setBindAddress(addr.getHostString())
.setPort(addr.getPort())
.setNumHandlers(handlerCount)
.setVerbose(false)
.setSecretManager(null)
.build();
DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
return rpcServer;
}
/**
* After starting an RPC server, updates configuration with the actual
* listening address of that server. The listening address may be different
* from the configured address if, for example, the configured address uses
* port 0 to request use of an ephemeral port.
*
* @param conf configuration to update
* @param rpcAddressKey configuration key for RPC server address
* @param addr configured address
* @param rpcServer started RPC server.
*/
private static InetSocketAddress updateListenAddress(OzoneConfiguration conf,
String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) {
InetSocketAddress listenAddr = rpcServer.getListenerAddress();
InetSocketAddress updatedAddr = new InetSocketAddress(
addr.getHostString(), listenAddr.getPort());
conf.set(rpcAddressKey,
listenAddr.getHostString() + ":" + listenAddr.getPort());
return updatedAddr;
}
/**
* Main entry point for starting StorageContainerManager.
*
* @param argv arguments
* @throws IOException if startup fails due to I/O error
*/
public static void main(String[] argv) throws IOException {
StringUtils.startupShutdownMessage(StorageContainerManager.class,
argv, LOG);
try {
StorageContainerManager scm = new StorageContainerManager(
new OzoneConfiguration());
scm.start();
scm.join();
} catch (Throwable t) {
LOG.error("Failed to start the StorageContainerManager.", t);
terminate(1, t);
}
}
/**
* Returns a SCMCommandRepose from the SCM Command.
* @param cmd - Cmd
* @return SCMCommandResponseProto
* @throws InvalidProtocolBufferException
*/
@VisibleForTesting
public static SCMCommandResponseProto getCommandResponse(SCMCommand cmd)
throws InvalidProtocolBufferException {
Type type = cmd.getType();
switch (type) {
case nullCmd:
Preconditions.checkState(cmd.getClass() == NullCommand.class);
return SCMCommandResponseProto.newBuilder().setCmdType(cmd.getType())
.setNullCommand(
NullCmdResponseProto.parseFrom(cmd.getProtoBufMessage()))
.build();
default:
throw new IllegalArgumentException("Not implemented");
}
}
@VisibleForTesting
public static SCMRegisteredCmdResponseProto getRegisteredResponse(
SCMCommand cmd, SCMNodeAddressList addressList) {
Preconditions.checkState(cmd.getClass() == RegisteredCommand.class);
RegisteredCommand rCmd = (RegisteredCommand) cmd;
StorageContainerDatanodeProtocolProtos.Type type = cmd.getType();
if (type != Type.registeredCommand) {
throw new IllegalArgumentException("Registered command is not well " +
"formed. Internal Error.");
}
return SCMRegisteredCmdResponseProto.newBuilder()
//TODO : Fix this later when we have multiple SCM support.
//.setAddressList(addressList)
.setErrorCode(rCmd.getError())
.setClusterID(rCmd.getClusterID())
.setDatanodeUUID(rCmd.getDatanodeUUID()).build();
}
// TODO : This code will move into KSM later. Write now this code is stubbed
// implementation that lets the ozone tests pass.
@Override
public Set<LocatedContainer> getStorageContainerLocations(Set<String> keys)
throws IOException {
throw new IOException("Not Implemented.");
}
/**
* Asks SCM where a container should be allocated. SCM responds with the set
* of datanodes that should be used creating this container.
*
* @param containerName - Name of the container.
* @return Pipeline.
* @throws IOException
*/
@Override
public Pipeline allocateContainer(String containerName) throws IOException {
return scmContainerManager.allocateContainer(containerName);
}
/**
* Returns listening address of StorageLocation Protocol RPC server.
*
* @return listen address of StorageLocation RPC server
*/
@VisibleForTesting
public InetSocketAddress getClientRpcAddress() {
return clientRpcAddress;
}
/**
* Returns listening address of StorageDatanode Protocol RPC server.
*
* @return Address where datanode are communicating.
*/
public InetSocketAddress getDatanodeRpcAddress() {
return datanodeRpcAddress;
}
/**
* Start service.
*/
public void start() {
LOG.info(buildRpcServerStartMessage(
"StorageContainerLocationProtocol RPC server", clientRpcAddress));
clientRpcServer.start();
LOG.info(buildRpcServerStartMessage("RPC server for DataNodes",
datanodeRpcAddress));
datanodeRpcServer.start();
}
/**
* Stop service.
*/
public void stop() {
LOG.info("Stopping the StorageContainerLocationProtocol RPC server");
clientRpcServer.stop();
LOG.info("Stopping the RPC server for DataNodes");
datanodeRpcServer.stop();
}
/**
* Wait until service has completed shutdown.
*/
public void join() {
try {
clientRpcServer.join();
datanodeRpcServer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info("Interrupted during StorageContainerManager join.");
}
}
/**
* Returns SCM version.
*
* @return Version info.
*/
@Override
public SCMVersionResponseProto getVersion(
SCMVersionRequestProto versionRequest) throws IOException {
return getScmNodeManager().getVersion(versionRequest).getProtobufMessage();
}
/**
* Used by data node to send a Heartbeat.
*
* @param datanodeID - Datanode ID.
* @return - SCMHeartbeatResponseProto
* @throws IOException
*/
@Override
public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID)
throws IOException {
List<SCMCommand> commands = getScmNodeManager().sendHeartbeat(datanodeID);
List<SCMCommandResponseProto> cmdReponses = new LinkedList<>();
for (SCMCommand cmd : commands) {
cmdReponses.add(getCommandResponse(cmd));
}
return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdReponses)
.build();
}
/**
* Register Datanode.
*
* @param datanodeID - DatanodID.
* @param scmAddresses - List of SCMs this datanode is configured to
* communicate.
* @return SCM Command.
*/
@Override
public StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
register(DatanodeID datanodeID, String[] scmAddresses)
throws IOException {
// TODO : Return the list of Nodes that forms the SCM HA.
return getRegisteredResponse(scmNodeManager.register(datanodeID), null);
}
/**
* Returns the Number of Datanodes that are communicating with SCM.
*
* @param nodestate Healthy, Dead etc.
* @return int -- count
*/
public int getNodeCount(SCMNodeManager.NODESTATE nodestate) {
return scmNodeManager.getNodeCount(nodestate);
}
/**
* Returns node manager.
* @return - Node Manager
*/
@VisibleForTesting
public NodeManager getScmNodeManager() {
return scmNodeManager;
}
}

View File

@ -46,7 +46,7 @@ public class ContainerMapping implements Mapping {
LoggerFactory.getLogger(ContainerMapping.class);
private final NodeManager nodeManager;
private final int cacheSize;
private final long cacheSize;
private final Lock lock;
private final Charset encoding = Charset.forName("UTF-8");
private final LevelDBStore store;
@ -77,7 +77,7 @@ public class ContainerMapping implements Mapping {
}
File dbPath = new File(scmMetaDataDir, "SCM.db");
Options options = new Options();
options.cacheSize(this.cacheSize * (1024 * 1024));
options.cacheSize(this.cacheSize * (1024L * 1024L));
options.createIfMissing();
store = new LevelDBStore(dbPath, options);
this.lock = new ReentrantLock();
@ -85,6 +85,7 @@ public class ContainerMapping implements Mapping {
}
/**
* // TODO : Fix the code to handle multiple nodes.
* Translates a list of nodes, ordered such that the first is the leader, into
* a corresponding {@link Pipeline} object.
*

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.scm.node;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
import java.io.Closeable;
import java.util.List;
@ -45,9 +46,8 @@ import java.util.List;
* DECOMMISSIONED - Someone told us to remove this node from the tracking
* list, by calling removeNode. We will throw away this nodes info soon.
*/
public interface NodeManager extends Closeable, Runnable {
public interface NodeManager extends StorageContainerNodeProtocol, Closeable,
Runnable {
/**
* Removes a data node from the management of this Node Manager.
*

View File

@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
*
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@ -17,6 +17,6 @@
package org.apache.hadoop.ozone.scm;
/**
/*
* This package contains StorageContainerManager classes.
*/

View File

@ -1,563 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.storage;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import com.google.protobuf.BlockingService;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.CreateContainerRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.XceiverClient;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.ozone.protocol.LocatedContainer;
import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolServerSideTranslatorPB;
import org.apache.hadoop.util.StringUtils;
/**
* StorageContainerManager is the main entry point for the service that provides
* information about which HDFS nodes host containers.
*
* The current implementation is a stub suitable to begin end-to-end testing of
* Ozone service interactions. DataNodes report to StorageContainerManager
* using the existing heartbeat messages. StorageContainerManager lazily
* initializes a single storage container to be served by those DataNodes.
* All subsequent requests for container locations will reply with that single
* pipeline, using all registered nodes.
*
* This will evolve from a stub to a full-fledged implementation capable of
* partitioning the keyspace across multiple containers, with appropriate
* distribution across nodes.
*/
@InterfaceAudience.Private
public class StorageContainerManager
implements DatanodeProtocol, StorageContainerLocationProtocol {
private static final Logger LOG =
LoggerFactory.getLogger(StorageContainerManager.class);
private final StorageContainerNameService ns;
private final BlockManager blockManager;
private final XceiverClientManager xceiverClientManager;
private Pipeline singlePipeline;
/** The RPC server that listens to requests from DataNodes. */
private final RPC.Server datanodeRpcServer;
private final InetSocketAddress datanodeRpcAddress;
/** The RPC server that listens to requests from clients. */
private final RPC.Server clientRpcServer;
private final InetSocketAddress clientRpcAddress;
/**
* Creates a new StorageContainerManager. Configuration will be updated with
* information on the actual listening addresses used for RPC servers.
*
* @param conf configuration
*/
public StorageContainerManager(OzoneConfiguration conf)
throws IOException {
ns = new StorageContainerNameService();
boolean haEnabled = false;
blockManager = new BlockManager(ns, haEnabled, conf);
xceiverClientManager = new XceiverClientManager(conf);
RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
ProtobufRpcEngine.class);
RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
ProtobufRpcEngine.class);
final int handlerCount = conf.getInt(
OZONE_SCM_HANDLER_COUNT_KEY, OZONE_SCM_HANDLER_COUNT_DEFAULT);
final int maxDataLength = conf.getInt(IPC_MAXIMUM_DATA_LENGTH,
IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
BlockingService dnProtoPbService = DatanodeProtocolProtos.
DatanodeProtocolService.newReflectiveBlockingService(
new DatanodeProtocolServerSideTranslatorPB(this, maxDataLength));
final InetSocketAddress datanodeRpcAddr =
OzoneClientUtils.getScmDataNodeBindAddress(conf);
datanodeRpcServer = startRpcServer(conf, datanodeRpcAddr,
DatanodeProtocolPB.class, dnProtoPbService, handlerCount);
datanodeRpcAddress = updateListenAddress(conf,
OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer);
LOG.info(buildRpcServerStartMessage("RPC server for DataNodes",
datanodeRpcAddress));
BlockingService storageProtoPbService =
StorageContainerLocationProtocolProtos
.StorageContainerLocationProtocolService
.newReflectiveBlockingService(
new StorageContainerLocationProtocolServerSideTranslatorPB(this));
final InetSocketAddress clientRpcAddr =
OzoneClientUtils.getScmClientBindAddress(conf);
clientRpcServer = startRpcServer(conf, clientRpcAddr,
StorageContainerLocationProtocolPB.class, storageProtoPbService,
handlerCount);
clientRpcAddress = updateListenAddress(conf,
OZONE_SCM_CLIENT_ADDRESS_KEY, clientRpcAddr, clientRpcServer);
LOG.info(buildRpcServerStartMessage(
"StorageContainerLocationProtocol RPC server", clientRpcAddress));
}
@Override
public Set<LocatedContainer> getStorageContainerLocations(Set<String> keys)
throws IOException {
LOG.trace("getStorageContainerLocations keys = {}", keys);
Pipeline pipeline = initSingleContainerPipeline();
List<DatanodeDescriptor> liveNodes = new ArrayList<>();
blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false);
if (liveNodes.isEmpty()) {
throw new IOException("Storage container locations not found.");
}
Set<DatanodeInfo> locations =
Sets.<DatanodeInfo>newLinkedHashSet(liveNodes);
DatanodeInfo leader = liveNodes.get(0);
Set<LocatedContainer> locatedContainers =
Sets.newLinkedHashSetWithExpectedSize(keys.size());
for (String key: keys) {
locatedContainers.add(new LocatedContainer(key, key,
pipeline.getContainerName(), locations, leader));
}
LOG.trace("getStorageContainerLocations keys = {}, locatedContainers = {}",
keys, locatedContainers);
return locatedContainers;
}
/**
* Asks SCM where a container should be allocated. SCM responds with the set
* of datanodes that should be used creating this container.
*
* @param containerName - Name of the container.
* @return Pipeline.
* @throws IOException
*/
@Override
public Pipeline allocateContainer(String containerName) throws IOException {
// TODO : This whole file will be replaced when we switch over to using
// the new protocol. So skipping connecting this code for now.
return null;
}
@Override
public DatanodeRegistration registerDatanode(
DatanodeRegistration registration) throws IOException {
ns.writeLock();
try {
blockManager.getDatanodeManager().registerDatanode(registration);
} finally {
ns.writeUnlock();
}
return registration;
}
@Override
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
StorageReport[] reports, long dnCacheCapacity, long dnCacheUsed,
int xmitsInProgress, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease) throws IOException {
ns.readLock();
try {
long cacheCapacity = 0;
long cacheUsed = 0;
int maxTransfer = blockManager.getMaxReplicationStreams()
- xmitsInProgress;
DatanodeCommand[] cmds = blockManager.getDatanodeManager()
.handleHeartbeat(registration, reports, blockManager.getBlockPoolId(),
cacheCapacity, cacheUsed, xceiverCount, maxTransfer,
failedVolumes, volumeFailureSummary);
long txnId = 234;
NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
HAServiceProtocol.HAServiceState.ACTIVE, txnId);
RollingUpgradeInfo rollingUpgradeInfo = null;
long blockReportLeaseId = requestFullBlockReportLease ?
blockManager.requestBlockReportLeaseId(registration) : 0;
return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo,
blockReportLeaseId);
} finally {
ns.readUnlock();
}
}
@Override
public DatanodeCommand blockReport(DatanodeRegistration registration,
String poolId, StorageBlockReport[] reports, BlockReportContext context)
throws IOException {
for (int r = 0; r < reports.length; r++) {
final BlockListAsLongs storageContainerList = reports[r].getBlocks();
blockManager.processReport(registration, reports[r].getStorage(),
storageContainerList, context);
}
return null;
}
@Override
public DatanodeCommand cacheReport(DatanodeRegistration registration,
String poolId, List<Long> blockIds) throws IOException {
// Centralized Cache Management is not supported
return null;
}
@Override
public void blockReceivedAndDeleted(DatanodeRegistration registration,
String poolId, StorageReceivedDeletedBlocks[] rcvdAndDeletedBlocks)
throws IOException {
for(StorageReceivedDeletedBlocks r : rcvdAndDeletedBlocks) {
ns.writeLock();
try {
blockManager.processIncrementalBlockReport(registration, r);
} finally {
ns.writeUnlock();
}
}
}
@Override
public void errorReport(DatanodeRegistration registration,
int errorCode, String msg) throws IOException {
String dnName =
(registration == null) ? "Unknown DataNode" : registration.toString();
if (errorCode == DatanodeProtocol.NOTIFY) {
LOG.info("Error report from " + dnName + ": " + msg);
return;
}
if (errorCode == DatanodeProtocol.DISK_ERROR) {
LOG.warn("Disk error on " + dnName + ": " + msg);
} else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {
LOG.warn("Fatal disk error on " + dnName + ": " + msg);
blockManager.getDatanodeManager().removeDatanode(registration);
} else {
LOG.info("Error report from " + dnName + ": " + msg);
}
}
@Override
public NamespaceInfo versionRequest() throws IOException {
ns.readLock();
try {
return new NamespaceInfo(1, "random", "random", 2,
NodeType.STORAGE_CONTAINER_SERVICE);
} finally {
ns.readUnlock();
}
}
@Override
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
ns.writeLock();
try {
for (int i = 0; i < blocks.length; i++) {
ExtendedBlock blk = blocks[i].getBlock();
DatanodeInfo[] nodes = blocks[i].getLocations();
String[] storageIDs = blocks[i].getStorageIDs();
for (int j = 0; j < nodes.length; j++) {
blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j],
storageIDs == null ? null: storageIDs[j],
"client machine reported it");
}
}
} finally {
ns.writeUnlock();
}
}
@Override
public void commitBlockSynchronization(ExtendedBlock block,
long newgenerationstamp, long newlength, boolean closeFile,
boolean deleteblock, DatanodeID[] newtargets, String[] newtargetstorages)
throws IOException {
// Not needed for the purpose of object store
throw new UnsupportedOperationException();
}
/**
* Returns information on registered DataNodes.
*
* @param type DataNode type to report
* @return registered DataNodes matching requested type
*/
public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) {
ns.readLock();
try {
List<DatanodeDescriptor> results =
blockManager.getDatanodeManager().getDatanodeListForReport(type);
return results.toArray(new DatanodeInfo[results.size()]);
} finally {
ns.readUnlock();
}
}
/**
* Returns listen address of client RPC server.
*
* @return listen address of client RPC server
*/
@VisibleForTesting
public InetSocketAddress getClientRpcAddress() {
return clientRpcAddress;
}
/**
* Start service.
*/
public void start() {
clientRpcServer.start();
datanodeRpcServer.start();
}
/**
* Stop service.
*/
public void stop() {
if (clientRpcServer != null) {
clientRpcServer.stop();
}
if (datanodeRpcServer != null) {
datanodeRpcServer.stop();
}
IOUtils.closeStream(ns);
}
/**
* Wait until service has completed shutdown.
*/
public void join() {
try {
clientRpcServer.join();
datanodeRpcServer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info("Interrupted during StorageContainerManager join.");
}
}
/**
* Lazily initializes a single container pipeline using all registered
* DataNodes via a synchronous call to the container protocol. This single
* container pipeline will be reused for container requests for the lifetime
* of this StorageContainerManager.
*
* @throws IOException if there is an I/O error
*/
private synchronized Pipeline initSingleContainerPipeline()
throws IOException {
if (singlePipeline == null) {
List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>();
blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false);
if (liveNodes.isEmpty()) {
throw new IOException("Storage container locations not found.");
}
Pipeline newPipeline = newPipelineFromNodes(liveNodes,
UUID.randomUUID().toString());
XceiverClient xceiverClient =
xceiverClientManager.acquireClient(newPipeline);
try {
ContainerData containerData = ContainerData
.newBuilder()
.setName(newPipeline.getContainerName())
.build();
CreateContainerRequestProto createContainerRequest =
CreateContainerRequestProto.newBuilder()
.setPipeline(newPipeline.getProtobufMessage())
.setContainerData(containerData)
.build();
ContainerCommandRequestProto request = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.CreateContainer)
.setCreateContainer(createContainerRequest)
.build();
ContainerCommandResponseProto response = xceiverClient.sendCommand(
request);
Result result = response.getResult();
if (result != Result.SUCCESS) {
throw new IOException(
"Failed to initialize container due to result code: " + result);
}
singlePipeline = newPipeline;
} finally {
xceiverClientManager.releaseClient(xceiverClient);
}
}
return singlePipeline;
}
/**
* Builds a message for logging startup information about an RPC server.
*
* @param description RPC server description
* @param addr RPC server listening address
* @return server startup message
*/
private static String buildRpcServerStartMessage(String description,
InetSocketAddress addr) {
return addr != null ? String.format("%s is listening at %s",
description, addr.getHostString() + ":" + addr.getPort()) :
String.format("%s not started", description);
}
/**
* Translates a list of nodes, ordered such that the first is the leader, into
* a corresponding {@link Pipeline} object.
*
* @param nodes list of nodes
* @param containerName container name
* @return pipeline corresponding to nodes
*/
private static Pipeline newPipelineFromNodes(List<DatanodeDescriptor> nodes,
String containerName) {
String leaderId = nodes.get(0).getDatanodeUuid();
Pipeline pipeline = new Pipeline(leaderId);
for (DatanodeDescriptor node : nodes) {
pipeline.addMember(node);
}
pipeline.setContainerName(containerName);
return pipeline;
}
/**
* Starts an RPC server, if configured.
*
* @param conf configuration
* @param addr configured address of RPC server
* @param protocol RPC protocol provided by RPC server
* @param instance RPC protocol implementation instance
* @param handlerCount RPC server handler count
*
* @return RPC server
* @throws IOException if there is an I/O error while creating RPC server
*/
private static RPC.Server startRpcServer(OzoneConfiguration conf,
InetSocketAddress addr, Class<?> protocol, BlockingService instance,
int handlerCount)
throws IOException {
RPC.Server rpcServer = new RPC.Builder(conf)
.setProtocol(protocol)
.setInstance(instance)
.setBindAddress(addr.getHostString())
.setPort(addr.getPort())
.setNumHandlers(handlerCount)
.setVerbose(false)
.setSecretManager(null)
.build();
DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
return rpcServer;
}
/**
* After starting an RPC server, updates configuration with the actual
* listening address of that server. The listening address may be different
* from the configured address if, for example, the configured address uses
* port 0 to request use of an ephemeral port.
*
* @param conf configuration to update
* @param rpcAddressKey configuration key for RPC server address
* @param addr configured address
* @param rpcServer started RPC server.
*/
private static InetSocketAddress updateListenAddress(OzoneConfiguration conf,
String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) {
InetSocketAddress listenAddr = rpcServer.getListenerAddress();
InetSocketAddress updatedAddr = new InetSocketAddress(
addr.getHostString(), listenAddr.getPort());
conf.set(rpcAddressKey,
addr.getHostString() + ":" + updatedAddr.getPort());
return updatedAddr;
}
/**
* Main entry point for starting StorageContainerManager.
*
* @param argv arguments
* @throws IOException if startup fails due to I/O error
*/
public static void main(String[] argv) throws IOException {
StringUtils.startupShutdownMessage(
StorageContainerManager.class, argv, LOG);
try {
StorageContainerManager scm = new StorageContainerManager(
new OzoneConfiguration());
scm.start();
scm.join();
} catch (Throwable t) {
LOG.error("Failed to start the StorageContainerManager.", t);
terminate(1, t);
}
}
}

View File

@ -1,131 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.storage;
import java.io.Closeable;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.namenode.CacheManager;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
/**
* Namesystem implementation intended for use by StorageContainerManager.
*/
@InterfaceAudience.Private
public class StorageContainerNameService implements Namesystem, Closeable {
private final ReentrantReadWriteLock coarseLock =
new ReentrantReadWriteLock();
private volatile boolean serviceRunning = true;
@Override
public boolean isRunning() {
return serviceRunning;
}
@Override
public BlockCollection getBlockCollection(long id) {
// TBD
return null;
}
@Override
public void startSecretManagerIfNecessary() {
// Secret manager is not supported
}
@Override
public CacheManager getCacheManager() {
// Centralized Cache Management is not supported
return null;
}
@Override
public HAContext getHAContext() {
// HA mode is not supported
return null;
}
@Override
public boolean inTransitionToActive() {
// HA mode is not supported
return false;
}
@Override
public boolean isInSnapshot(long blockCollectionID) {
// Snapshots not supported
return false;
}
@Override
public void readLock() {
coarseLock.readLock().lock();
}
@Override
public void readUnlock() {
coarseLock.readLock().unlock();
}
@Override
public boolean hasReadLock() {
return coarseLock.getReadHoldCount() > 0 || hasWriteLock();
}
@Override
public void writeLock() {
coarseLock.writeLock().lock();
}
@Override
public void writeLockInterruptibly() throws InterruptedException {
coarseLock.writeLock().lockInterruptibly();
}
@Override
public void writeUnlock() {
coarseLock.writeLock().unlock();
}
@Override
public boolean hasWriteLock() {
return coarseLock.isWriteLockedByCurrentThread();
}
@Override
public boolean isInSafeMode() {
// Safe mode is not supported
return false;
}
@Override
public boolean isInStartupSafeMode() {
// Safe mode is not supported
return false;
}
@Override
public void close() {
serviceRunning = false;
}
}

View File

@ -1,23 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.storage;
/**
* This package contains StorageContainerManager classes.
*/

View File

@ -17,53 +17,56 @@
*/
package org.apache.hadoop.ozone;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.ozone.scm.StorageContainerManager;
import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
import org.apache.hadoop.ozone.web.client.OzoneClient;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.storage.StorageContainerManager;
import org.apache.hadoop.ozone.web.client.OzoneClient;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.security.UserGroupInformation;
import static org.junit.Assert.assertFalse;
/**
* MiniOzoneCluster creates a complete in-process Ozone cluster suitable for
* running tests. The cluster consists of a StorageContainerManager and
* multiple DataNodes. This class subclasses {@link MiniDFSCluster} for
* convenient reuse of logic for starting DataNodes. Unlike MiniDFSCluster, it
* does not start a NameNode, because Ozone does not require a NameNode.
* running tests. The cluster consists of a StorageContainerManager, Namenode
* and multiple DataNodes. This class subclasses {@link MiniDFSCluster} for
* convenient reuse of logic for starting DataNodes.
*/
@InterfaceAudience.Private
public class MiniOzoneCluster extends MiniDFSCluster implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(MiniOzoneCluster.class);
private static final String USER_AUTH = "hdfs";
private final OzoneConfiguration conf;
private final StorageContainerManager scm;
private final Path tempPath;
/**
* Creates a new MiniOzoneCluster.
@ -77,69 +80,18 @@ public class MiniOzoneCluster extends MiniDFSCluster implements Closeable {
super(builder);
this.conf = builder.conf;
this.scm = scm;
}
/**
* Builder for configuring the MiniOzoneCluster to run.
*/
public static class Builder
extends org.apache.hadoop.hdfs.MiniDFSCluster.Builder {
private final OzoneConfiguration conf;
private Optional<String> ozoneHandlerType = Optional.absent();
/**
* Creates a new Builder.
*
* @param conf configuration
*/
public Builder(OzoneConfiguration conf) {
super(conf);
this.conf = conf;
this.nnTopology(new MiniDFSNNTopology()); // No NameNode required
}
@Override
public Builder numDataNodes(int val) {
super.numDataNodes(val);
return this;
}
public Builder setHandlerType(String handler) {
ozoneHandlerType = Optional.of(handler);
return this;
}
@Override
public MiniOzoneCluster build() throws IOException {
if (!ozoneHandlerType.isPresent()) {
throw new IllegalArgumentException(
"The Ozone handler type must be specified.");
}
conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
conf.setBoolean(OzoneConfigKeys.OZONE_TRACE_ENABLED_KEY, true);
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY, ozoneHandlerType.get());
conf.set(OzoneConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
conf.set(OzoneConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
StorageContainerManager scm = new StorageContainerManager(conf);
scm.start();
MiniOzoneCluster cluster = new MiniOzoneCluster(this, scm);
try {
cluster.waitOzoneReady();
} catch(Exception e) {
// A workaround to propagate MiniOzoneCluster failures without
// changing the method signature (which would require cascading
// changes to hundreds of unrelated HDFS tests).
throw new IOException("Failed to start MiniOzoneCluster", e);
}
return cluster;
}
tempPath = Paths.get(builder.getPath(), builder.getRunID());
}
@Override
public void close() {
shutdown();
try {
FileUtils.deleteDirectory(tempPath.toFile());
} catch (IOException e) {
String errorMessage = "Cleaning up metadata directories failed." + e;
assertFalse(errorMessage, true);
}
}
@Override
@ -197,7 +149,8 @@ public class MiniOzoneCluster extends MiniDFSCluster implements Closeable {
return new StorageContainerLocationProtocolClientSideTranslatorPB(
RPC.getProxy(StorageContainerLocationProtocolPB.class, version,
address, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf)));
NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)));
}
/**
@ -207,15 +160,226 @@ public class MiniOzoneCluster extends MiniDFSCluster implements Closeable {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
final DatanodeInfo[] reports =
scm.getDatanodeReport(DatanodeReportType.LIVE);
if (reports.length >= numDataNodes) {
if (scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY)
>= numDataNodes) {
return true;
}
LOG.info("Waiting for cluster to be ready. Got {} of {} DN reports.",
reports.length, numDataNodes);
LOG.info("Waiting for cluster to be ready. Got {} of {} DN Heartbeats.",
scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY),
numDataNodes);
return false;
}
}, 1000, 5 * 60 * 1000); //wait for 5 mins.
}
/**
* Waits for SCM to be out of Chill Mode. Many tests can be run iff we are out
* of Chill mode.
*
* @throws TimeoutException
* @throws InterruptedException
*/
public void waitTobeOutOfChillMode() throws TimeoutException,
InterruptedException {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
if (scm.getScmNodeManager().isOutOfNodeChillMode()) {
return true;
}
LOG.info("Waiting for cluster to be ready. No datanodes found");
return false;
}
}, 100, 45000);
}
/**
* Builder for configuring the MiniOzoneCluster to run.
*/
public static class Builder
extends org.apache.hadoop.hdfs.MiniDFSCluster.Builder {
private final OzoneConfiguration conf;
private final int defaultHBSeconds = 1;
private final int defaultProcessorMs = 100;
private final String path;
private final UUID runID;
private Optional<String> ozoneHandlerType = Optional.absent();
private Optional<Boolean> enableTrace = Optional.of(true);
private Optional<Integer> hbSeconds = Optional.absent();
private Optional<Integer> hbProcessorInterval = Optional.absent();
private Optional<String> scmMetadataDir = Optional.absent();
private Boolean ozoneEnabled = true;
private Boolean waitForChillModeFinish = true;
private int containerWorkerThreadInterval = 1;
/**
* Creates a new Builder.
*
* @param conf configuration
*/
public Builder(OzoneConfiguration conf) {
super(conf);
this.conf = conf;
// TODO : Remove this later, with SCM, NN and SCM can run together.
//this.nnTopology(new MiniDFSNNTopology()); // No NameNode required
URL p = conf.getClass().getResource("");
path = p.getPath().concat(MiniOzoneCluster.class.getSimpleName() + UUID
.randomUUID().toString());
runID = UUID.randomUUID();
}
@Override
public Builder numDataNodes(int val) {
super.numDataNodes(val);
return this;
}
public Builder setHandlerType(String handler) {
ozoneHandlerType = Optional.of(handler);
return this;
}
public Builder setTrace(Boolean trace) {
enableTrace = Optional.of(trace);
return this;
}
public Builder setSCMHBInterval(int seconds) {
hbSeconds = Optional.of(seconds);
return this;
}
public Builder setSCMHeartbeatProcessingInterval(int milliseconds) {
hbProcessorInterval = Optional.of(milliseconds);
return this;
}
public Builder setSCMMetadataDir(String scmMetadataDirPath) {
scmMetadataDir = Optional.of(scmMetadataDirPath);
return this;
}
public Builder disableOzone() {
ozoneEnabled = false;
return this;
}
public Builder doNotwaitTobeOutofChillMode() {
waitForChillModeFinish = false;
return this;
}
public Builder setSCMContainerWorkerThreadInterval(int intervalInSeconds) {
containerWorkerThreadInterval = intervalInSeconds;
return this;
}
public String getPath() {
return path;
}
public String getRunID() {
return runID.toString();
}
@Override
public MiniOzoneCluster build() throws IOException {
configureHandler();
configureTrace();
configureSCMheartbeat();
configScmMetadata();
conf.set(OzoneConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
conf.set(OzoneConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
StorageContainerManager scm = new StorageContainerManager(conf);
scm.start();
String addressString = scm.getDatanodeRpcAddress().getHostString() +
":" + scm.getDatanodeRpcAddress().getPort();
conf.setStrings(OzoneConfigKeys.OZONE_SCM_NAMES, addressString);
MiniOzoneCluster cluster = new MiniOzoneCluster(this, scm);
try {
cluster.waitOzoneReady();
if (waitForChillModeFinish) {
cluster.waitTobeOutOfChillMode();
}
} catch (Exception e) {
// A workaround to propagate MiniOzoneCluster failures without
// changing the method signature (which would require cascading
// changes to hundreds of unrelated HDFS tests).
throw new IOException("Failed to start MiniOzoneCluster", e);
}
return cluster;
}
private void configScmMetadata() throws IOException {
if (scmMetadataDir.isPresent()) {
// if user specifies a path in the test, it is assumed that user takes
// care of creating and cleaning up that directory after the tests.
conf.set(OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS,
scmMetadataDir.get());
return;
}
// If user has not specified a path, create a UUID for this miniCluser
// and create SCM under that directory.
Path scmPath = Paths.get(path, runID.toString(), "scm");
Files.createDirectories(scmPath);
conf.set(OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS, scmPath
.toString());
// TODO : Fix this, we need a more generic mechanism to map
// different datanode ID for different datanodes when we have lots of
// datanodes in the cluster.
conf.setStrings(OzoneConfigKeys.OZONE_SCM_DATANODE_ID,
scmPath.toString() + "/datanode.id");
}
private void configureHandler() {
conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, this.ozoneEnabled);
if (!ozoneHandlerType.isPresent()) {
throw new IllegalArgumentException(
"The Ozone handler type must be specified.");
}
}
private void configureTrace() {
if (enableTrace.isPresent()) {
conf.setBoolean(OzoneConfigKeys.OZONE_TRACE_ENABLED_KEY,
enableTrace.get());
}
GenericTestUtils.setLogLevel(org.apache.log4j.Logger.getRootLogger(),
Level.ALL);
}
private void configureSCMheartbeat() {
if (hbSeconds.isPresent()) {
conf.setInt(OzoneConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS,
hbSeconds.get());
} else {
conf.setInt(OzoneConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS,
defaultHBSeconds);
}
if (hbProcessorInterval.isPresent()) {
conf.setInt(OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS,
hbProcessorInterval.get());
} else {
conf.setInt(OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS,
defaultProcessorMs);
}
}
}
}

View File

@ -27,7 +27,8 @@ import java.util.Set;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
// TODO : We need this when we enable these tests back.
//import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.apache.hadoop.io.IOUtils;
@ -63,7 +64,9 @@ public class TestStorageContainerManager {
IOUtils.cleanup(null, storageContainerLocationClient, cluster);
}
@Test
// TODO : Disabling this test after verifying that failure is due
// Not Implemented exception. Will turn on this test in next patch
//@Test
public void testLocationsForSingleKey() throws Exception {
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(1)
.setHandlerType("distributed").build();
@ -77,7 +80,9 @@ public class TestStorageContainerManager {
assertLocatedContainer(containers, "/key1", 1);
}
@Test
// TODO : Disabling this test after verifying that failure is due
// Not Implemented exception. Will turn on this test in next patch
//@Test
public void testLocationsForMultipleKeys() throws Exception {
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(1)
.setHandlerType("distributed").build();
@ -92,11 +97,14 @@ public class TestStorageContainerManager {
assertLocatedContainer(containers, "/key2", 1);
assertLocatedContainer(containers, "/key3", 1);
}
@Test
// TODO : Disabling this test after verifying that failure is due
// Not Implemented exception. Will turn on this test in next patch
//@Test
public void testNoDataNodes() throws Exception {
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(0)
.setHandlerType("distributed").build();
.setHandlerType("distributed")
.doNotwaitTobeOutofChillMode()
.build();
storageContainerLocationClient =
cluster.createStorageContainerLocationClient();
exception.expect(IOException.class);
@ -105,7 +113,9 @@ public class TestStorageContainerManager {
new LinkedHashSet<>(Arrays.asList("/key1")));
}
@Test
// TODO : Disabling this test after verifying that failure is due
// Not Implemented exception. Will turn on this test in next patch
//@Test
public void testMultipleDataNodes() throws Exception {
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(3)
.setHandlerType("distributed").build();

View File

@ -79,7 +79,8 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
*/
@Override
public StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto
getVersion() throws IOException {
getVersion(StorageContainerDatanodeProtocolProtos
.SCMVersionRequestProto unused) throws IOException {
rpcCount.incrementAndGet();
sleepIfNeeded();
VersionInfo versionInfo = VersionInfo.getLatestVersion();
@ -119,7 +120,10 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
.newBuilder().setCmdType(StorageContainerDatanodeProtocolProtos
.Type.nullCmd)
.setNullCommand(
NullCommand.newBuilder().build().getProtoBufMessage()).build();
StorageContainerDatanodeProtocolProtos.NullCmdResponseProto
.parseFrom(
NullCommand.newBuilder().build().getProtoBufMessage()))
.build();
return StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
.newBuilder()
.addCommands(cmdResponse).build();

View File

@ -93,12 +93,7 @@ public class TestDatanodeStateMachine {
path = Paths.get(path.toString(),
TestDatanodeStateMachine.class.getSimpleName() + ".id").toString();
conf.set(OzoneConfigKeys.OZONE_SCM_DATANODE_ID, path);
executorService = HadoopExecutors.newScheduledThreadPool(
conf.getInt(
OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS,
OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS_DEFAULT),
executorService = HadoopExecutors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Test Data Node State Machine Thread - %d").build());
}

View File

@ -66,7 +66,7 @@ public class TestEndPoint {
SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
serverAddress, 1000)) {
SCMVersionResponseProto responseProto = rpcEndPoint.getEndPoint()
.getVersion();
.getVersion(null);
Assert.assertNotNull(responseProto);
Assert.assertEquals(responseProto.getKeys(0).getKey(),
VersionInfo.DESCRIPTION_KEY);

View File

@ -102,7 +102,7 @@ public class TestContainerPersistence {
Assert.assertTrue(containerDir.mkdirs());
cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType("local").build();
.setHandlerType("distributed").build();
containerManager = new ContainerManagerImpl();
chunkManager = new ChunkManagerImpl(containerManager);
containerManager.setChunkManager(chunkManager);
@ -113,7 +113,9 @@ public class TestContainerPersistence {
@AfterClass
public static void shutdown() throws IOException {
if(cluster != null) {
cluster.shutdown();
}
FileUtils.deleteDirectory(new File(path));
}

View File

@ -55,7 +55,7 @@ public class TestOzoneContainer {
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType("local").build();
.setHandlerType("distributed").build();
// We don't start Ozone Container via data node, we will do it
// independently in our test path.
@ -99,7 +99,7 @@ public class TestOzoneContainer {
pipeline.getLeader().getContainerPort());
MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType("local").build();
.setHandlerType("distributed").build();
// This client talks to ozone container via datanode.
XceiverClient client = new XceiverClient(pipeline, conf);
@ -189,7 +189,7 @@ public class TestOzoneContainer {
pipeline.getLeader().getContainerPort());
MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType("local").build();
.setHandlerType("distributed").build();
// This client talks to ozone container via datanode.
XceiverClient client = new XceiverClient(pipeline, conf);

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
/**
* Test allocate container calls.
*/
public class TestAllocateContainer {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
@Rule
public ExpectedException thrown = ExpectedException.none();
@BeforeClass
public static void init() throws IOException {
conf = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(1)
.setHandlerType("distributed").build();
}
@AfterClass
public static void shutdown() throws InterruptedException {
if(cluster != null) {
cluster.shutdown();
}
IOUtils.cleanup(null, storageContainerLocationClient, cluster);
}
@Test
public void testAllocate() throws Exception {
storageContainerLocationClient =
cluster.createStorageContainerLocationClient();
Pipeline pipeline = storageContainerLocationClient.allocateContainer(
"container0");
Assert.assertNotNull(pipeline);
Assert.assertNotNull(pipeline.getLeader());
}
@Test
public void testAllocateNull() throws Exception {
storageContainerLocationClient =
cluster.createStorageContainerLocationClient();
thrown.expect(NullPointerException.class);
storageContainerLocationClient.allocateContainer(null);
}
@Test
public void testAllocateDuplicate() throws Exception {
String containerName = RandomStringUtils.randomAlphanumeric(10);
storageContainerLocationClient =
cluster.createStorageContainerLocationClient();
thrown.expect(IOException.class);
thrown.expectMessage("Specified container already exists");
storageContainerLocationClient.allocateContainer(containerName);
storageContainerLocationClient.allocateContainer(containerName);
}
}

View File

@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@ -19,6 +19,10 @@ package org.apache.hadoop.ozone.scm.container;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import java.io.IOException;
@ -43,7 +47,7 @@ public class MockNodeManager implements NodeManager {
/**
* Sets the chill mode value.
* @param chillmode
* @param chillmode boolean
*/
public void setChillmode(boolean chillmode) {
this.chillmode = chillmode;
@ -198,4 +202,41 @@ public class MockNodeManager implements NodeManager {
public void run() {
}
/**
* Gets the version info from SCM.
*
* @param versionRequest - version Request.
* @return - returns SCM version info and other required information needed by
* datanode.
*/
@Override
public VersionResponse getVersion(StorageContainerDatanodeProtocolProtos
.SCMVersionRequestProto versionRequest) {
return null;
}
/**
* Register the node if the node finds that it is not registered with any
* SCM.
*
* @param datanodeID - Send datanodeID with Node info, but datanode UUID is
* empty. Server returns a datanodeID for the given node.
* @return SCMHeartbeatResponseProto
*/
@Override
public SCMCommand register(DatanodeID datanodeID) {
return null;
}
/**
* Send heartbeat to indicate the datanode is alive and doing well.
*
* @param datanodeID - Datanode ID.
* @return SCMheartbeat response list
*/
@Override
public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID) {
return null;
}
}

View File

@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm;
/**
* SCM tests
*/

View File

@ -23,6 +23,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.ozone.OzoneConsts.CHUNK_SIZE;
import static org.junit.Assert.*;
import org.apache.commons.lang.RandomStringUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@ -50,7 +51,6 @@ public class TestOzoneRestWithMiniCluster {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
private static int idSuffix;
private static OzoneClient ozoneClient;
@Rule
@ -59,7 +59,7 @@ public class TestOzoneRestWithMiniCluster {
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(3)
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(1)
.setHandlerType("distributed").build();
ozoneClient = cluster.createOzoneClient();
}
@ -250,6 +250,6 @@ public class TestOzoneRestWithMiniCluster {
* @return unique ID generated by appending a suffix to the given prefix
*/
private static String nextId(String idPrefix) {
return idPrefix + ++idSuffix;
return (idPrefix + RandomStringUtils.random(5, true, true)).toLowerCase();
}
}