diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java index c6e47c899ea..fc9092a6e89 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.scm; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; @@ -87,6 +88,15 @@ public class XceiverClient implements XceiverClientSpi { channelFuture = b.connect(leader.getHostName(), port).sync(); } + /** + * Returns if the exceiver client connects to a server. + * @return True if the connection is alive, false otherwise. + */ + @VisibleForTesting + public boolean isConnected() { + return channelFuture.channel().isActive(); + } + @Override public void close() { if(group != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index f42a9565ae0..9d242d6deba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -30,6 +30,20 @@ public final class OzoneConfigKeys { public static final String DFS_CONTAINER_IPC_PORT = "dfs.container.ipc"; public static final int DFS_CONTAINER_IPC_PORT_DEFAULT = 50011; + + /** + * + * When set to true, allocate a random free port for ozone container, + * so that a mini cluster is able to launch multiple containers on a node. + * + * When set to false (default), container port is fixed as specified by + * DFS_CONTAINER_IPC_PORT_DEFAULT. + */ + public static final String DFS_CONTAINER_IPC_RANDOM_PORT = + "dfs.container.ipc.random.port"; + public static final boolean DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT = + false; + public static final String OZONE_LOCALSTORAGE_ROOT = "ozone.localstorage.root"; public static final String OZONE_LOCALSTORAGE_ROOT_DEFAULT = "/tmp/ozone"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java index 4aff9728d4b..05dd41e2e1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java @@ -22,6 +22,8 @@ import com.google.common.base.Preconditions; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl; import org.apache.hadoop.utils.LevelDBStore; @@ -30,9 +32,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; import static org.apache.commons.io.FilenameUtils.removeExtension; import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos @@ -367,4 +373,83 @@ public final class ContainerUtils { FileUtils.forceDelete(containerPath.toFile()); FileUtils.forceDelete(metaPath.toFile()); } + + /** + * Write datanode ID protobuf messages to an ID file. + * The old ID file will be overwritten. + * + * @param ids A set of {@link DatanodeID} + * @param path Local ID file path + * @throws IOException When read/write error occurs + */ + private synchronized static void writeDatanodeIDs(List ids, + File path) throws IOException { + if (path.exists()) { + if (!path.delete() || !path.createNewFile()) { + throw new IOException("Unable to overwrite the datanode ID file."); + } + } else { + if(!path.getParentFile().exists() && + !path.getParentFile().mkdirs()) { + throw new IOException("Unable to create datanode ID directories."); + } + } + try (FileOutputStream out = new FileOutputStream(path)) { + for (DatanodeID id : ids) { + HdfsProtos.DatanodeIDProto dnId = id.getProtoBufMessage(); + dnId.writeDelimitedTo(out); + } + } + } + + /** + * Persistent a {@link DatanodeID} to a local file. + * It reads the IDs first and append a new entry only if the ID is new. + * This is to avoid on some dirty environment, this file gets too big. + * + * @throws IOException when read/write error occurs + */ + public synchronized static void writeDatanodeIDTo(DatanodeID dnID, + File path) throws IOException { + List ids = ContainerUtils.readDatanodeIDsFrom(path); + // Only create or overwrite the file + // if the ID doesn't exist in the ID file + for (DatanodeID id : ids) { + if (id.getProtoBufMessage() + .equals(dnID.getProtoBufMessage())) { + return; + } + } + ids.add(dnID); + writeDatanodeIDs(ids, path); + } + + /** + * Read {@link DatanodeID} from a local ID file and return a set of + * datanode IDs. If the ID file doesn't exist, an empty set is returned. + * + * @param path ID file local path + * @return A set of {@link DatanodeID} + * @throws IOException If the id file is malformed or other I/O exceptions + */ + public synchronized static List readDatanodeIDsFrom(File path) + throws IOException { + List ids = new ArrayList(); + if (!path.exists()) { + return ids; + } + try(FileInputStream in = new FileInputStream(path)) { + while(in.available() > 0) { + try { + HdfsProtos.DatanodeIDProto id = + HdfsProtos.DatanodeIDProto.parseDelimitedFrom(in); + ids.add(DatanodeID.getFromProtoBuf(id)); + } catch (IOException e) { + throw new IOException("Failed to parse Datanode ID from " + + path.getAbsolutePath(), e); + } + } + } + return ids; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index c8f6dc7f378..600884421f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.statemachine; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.OzoneClientUtils; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.util.Time; @@ -44,13 +45,16 @@ public class DatanodeStateMachine implements Closeable { private final long heartbeatFrequency; private StateContext context; private final OzoneContainer container; + private DatanodeID datanodeID = null; /** * Constructs a a datanode state machine. * + * @param datanodeID - DatanodeID used to identify a datanode * @param conf - Configration. */ - public DatanodeStateMachine(Configuration conf) throws IOException { + public DatanodeStateMachine(DatanodeID datanodeID, + Configuration conf) throws IOException { this.conf = conf; executorService = HadoopExecutors.newCachedThreadPool( new ThreadFactoryBuilder().setDaemon(true) @@ -60,6 +64,26 @@ public class DatanodeStateMachine implements Closeable { heartbeatFrequency = TimeUnit.SECONDS.toMillis( OzoneClientUtils.getScmHeartbeatInterval(conf)); container = new OzoneContainer(conf); + this.datanodeID = datanodeID; + } + + public DatanodeStateMachine(Configuration conf) + throws IOException { + this(null, conf); + } + + public void setDatanodeID(DatanodeID datanodeID) { + this.datanodeID = datanodeID; + } + + /** + * + * Return DatanodeID if set, return null otherwise. + * + * @return datanodeID + */ + public DatanodeID getDatanodeID() { + return this.datanodeID; } /** @@ -71,10 +95,14 @@ public class DatanodeStateMachine implements Closeable { return connectionManager; } + public OzoneContainer getContainer() { + return this.container; + } + /** * Runs the state machine at a fixed frequency. */ - public void start() throws IOException { + private void start() throws IOException { long now = 0; long nextHB = 0; container.start(); @@ -216,12 +244,14 @@ public class DatanodeStateMachine implements Closeable { } } - public static DatanodeStateMachine initStateMachine(Configuration conf) - throws IOException { - DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf); + /** + * Start datanode state machine as a single thread daemon. + */ + public void startDaemon() { Runnable startStateMachineTask = () -> { try { - stateMachine.start(); + start(); + LOG.info("Ozone container server started."); } catch (Exception ex) { LOG.error("Unable to start the DatanodeState Machine", ex); } @@ -231,6 +261,32 @@ public class DatanodeStateMachine implements Closeable { .setNameFormat("Datanode State Machine Thread - %d") .build().newThread(startStateMachineTask); thread.start(); - return stateMachine; + } + + /** + * Stop the daemon thread of the datanode state machine. + */ + public synchronized void stopDaemon() { + try { + context.setState(DatanodeStates.SHUTDOWN); + this.close(); + LOG.info("Ozone container server stopped."); + } catch (IOException e) { + LOG.error("Stop ozone container server failed.", e); + } + } + + /** + * + * Check if the datanode state machine daemon is stopped. + * + * @return True if datanode state machine daemon is stopped + * and false otherwise. + */ + @VisibleForTesting + public boolean isDaemonStopped() { + return this.executorService.isShutdown() + && this.getContext().getExecutionCount() == 0 + && this.getContext().getState() == DatanodeStates.SHUTDOWN; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 15a241edf34..e02079145e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -80,6 +80,15 @@ public class StateContext { return parent; } + /** + * Get the container server port. + * @return The container server port if available, return -1 if otherwise + */ + public int getContainerPort() { + return parent == null ? + -1 : parent.getContainer().getContainerServerPort(); + } + /** * Returns true if we are entering a new state. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java index 9e95f533958..042739205d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java @@ -18,14 +18,19 @@ package org.apache.hadoop.ozone.container.common.states.datanode; import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.states.DatanodeState; +import org.apache.hadoop.scm.ScmConfigKeys; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.Collection; import java.util.concurrent.Callable; @@ -87,9 +92,29 @@ public class InitDatanodeState implements DatanodeState, connectionManager.addSCMServer(addr); } } + + // If datanode ID is set, persist it to the ID file. + persistContainerDatanodeID(); + return this.context.getState().getNextState(); } + /** + * Update Ozone container port to the datanode ID, + * and persist the ID to a local file. + */ + private void persistContainerDatanodeID() throws IOException { + String dataNodeIDPath = conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID); + File idPath = new File(dataNodeIDPath); + int containerPort = this.context.getContainerPort(); + DatanodeID datanodeID = this.context.getParent().getDatanodeID(); + if (datanodeID != null) { + datanodeID.setContainerPort(containerPort); + ContainerUtils.writeDatanodeIDTo(datanodeID, idPath); + LOG.info("Datanode ID is persisted to {}", dataNodeIDPath); + } + } + /** * Called before entering this state. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java index 29922189a15..590df2df914 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java @@ -19,8 +19,7 @@ package org.apache.hadoop.ozone.container.common.states.datanode; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; @@ -35,16 +34,11 @@ import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.LinkedList; import java.util.List; -import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; @@ -83,63 +77,30 @@ public class RunningDatanodeState implements DatanodeState { private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto readPersistedDatanodeID(Path idPath) throws IOException { Preconditions.checkNotNull(idPath); - StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto - containerIDProto; - try (FileInputStream stream = new FileInputStream(idPath.toFile())) { - containerIDProto = StorageContainerDatanodeProtocolProtos - .ContainerNodeIDProto.parseFrom(stream); - return containerIDProto; + DatanodeID datanodeID = null; + List datanodeIDs = + ContainerUtils.readDatanodeIDsFrom(idPath.toFile()); + int containerPort = this.context.getContainerPort(); + for(DatanodeID dnId : datanodeIDs) { + if(dnId.getContainerPort() == containerPort) { + datanodeID = dnId; + break; + } } - } - /** - * Create a DatanodeID from the datanode information. - * - * @return DatanodeID - * @throws UnknownHostException - */ - private DatanodeID createDatanodeID() throws UnknownHostException { - DatanodeID temp = new DatanodeID( - //TODO : Replace this with proper network and kerberos - // support code. - InetAddress.getLocalHost().getHostAddress(), - DataNode.getHostName(conf), - UUID.randomUUID().toString(), - 0, /** XferPort - SCM does not use this port */ - 0, /** Info port - SCM does not use this port */ - 0, /** Info Secure Port - SCM does not use this port */ - 0); /** IPC port - SCM does not use this port */ - - // TODO: make this dynamically discoverable. SCM can hand out this - // port number to calling applications. This makes it easy to run multiple - // container endpoints on the same machine. - temp.setContainerPort(OzoneClientUtils.getContainerPort(conf)); - return temp; - } - - /** - * Creates a new ContainerID that persists both DatanodeID and ClusterID. - * - * @param idPath Path to the id file. - * @return ContainerNodeIDProto - * @throws UnknownHostException - */ - private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto - createNewContainerID(Path idPath) - throws IOException { - - if(!idPath.getParent().toFile().exists() && - !idPath.getParent().toFile().mkdirs()) { - LOG.error("Failed to create container ID locations. Path: {}", - idPath.getParent()); - throw new IOException("Unable to create container ID directories."); - } - StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto - containerIDProto = StorageContainerDatanodeProtocolProtos - .ContainerNodeIDProto.newBuilder() - .setDatanodeID(createDatanodeID().getProtoBufMessage()).build(); - try (FileOutputStream stream = new FileOutputStream(idPath.toFile())) { - stream.write(containerIDProto.toByteArray()); + if (datanodeID == null) { + throw new IOException("No valid datanode ID found from " + + idPath.toFile().getAbsolutePath() + + " that matches container port " + + containerPort); + } else { + StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto + containerIDProto = + StorageContainerDatanodeProtocolProtos + .ContainerNodeIDProto + .newBuilder() + .setDatanodeID(datanodeID.getProtoBufMessage()) + .build(); return containerIDProto; } } @@ -171,15 +132,7 @@ public class RunningDatanodeState implements DatanodeState { } catch (IOException ex) { LOG.trace("Not able to find container Node ID, creating it.", ex); } - // Not found, let us create a new datanode ID, persist it and return that - // info to SCM. - try { - nodeID = createNewContainerID(Paths.get(dataNodeIDPath)); - LOG.trace("Created Node ID :", nodeID.getDatanodeID().getDatanodeUuid()); - return nodeID; - } catch (IOException ex) { - LOG.error("Creating new node ID failed.", ex); - } + this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN); return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java index 264ba4afe02..cd2146b1a1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java @@ -29,15 +29,20 @@ import io.netty.handler.logging.LoggingHandler; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.ServerSocket; /** * Creates a netty server endpoint that acts as the communication layer for * Ozone containers. */ public final class XceiverServer implements XceiverServerSpi { - private final int port; + private static final Logger + LOG = LoggerFactory.getLogger(XceiverServer.class); + private int port; private final ContainerDispatcher storageContainer; private EventLoopGroup bossGroup; @@ -52,11 +57,30 @@ public final class XceiverServer implements XceiverServerSpi { public XceiverServer(Configuration conf, ContainerDispatcher dispatcher) { Preconditions.checkNotNull(conf); + this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); + // Get an available port on current node and + // use that as the container port + if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, + OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) { + try (ServerSocket socket = new ServerSocket(0)) { + socket.setReuseAddress(true); + this.port = socket.getLocalPort(); + LOG.info("Found a free port for the server : {}", this.port); + } catch (IOException e) { + LOG.error("Unable find a random free port for the server, " + + "fallback to use default port {}", this.port, e); + } + } this.storageContainer = dispatcher; } + @Override + public int getIPCPort() { + return this.port; + } + @Override public void start() throws IOException { bossGroup = new NioEventLoopGroup(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java index f274151a3f3..e5b04975b9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java @@ -28,4 +28,7 @@ public interface XceiverServerSpi { /** Stops a running server. */ void stop(); + + /** Get server IPC port. */ + int getIPCPort(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index e251da15777..5db1ce8b0f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -167,4 +167,13 @@ public class OzoneContainer { public SCMNodeReport getNodeReport() throws IOException { return this.manager.getNodeReport(); } + + /** + * Returns the container server IPC port. + * + * @return Container server IPC port. + */ + public int getContainerServerPort() { + return server.getIPCPort(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index d018b85f374..a380fc8301c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -173,6 +173,7 @@ public final class MiniOzoneCluster extends MiniDFSCluster LOG.info("Waiting for cluster to be ready. Got {} of {} DN Heartbeats.", scm.getNodeCount(SCMNodeManager.NODESTATE.HEALTHY), numDataNodes); + return false; } }, 1000, 5 * 60 * 1000); //wait for 5 mins. @@ -228,6 +229,7 @@ public final class MiniOzoneCluster extends MiniDFSCluster private Boolean ozoneEnabled = true; private Boolean waitForChillModeFinish = true; private int containerWorkerThreadInterval = 1; + private Boolean randomContainerPort = true; /** * Creates a new Builder. @@ -247,6 +249,11 @@ public final class MiniOzoneCluster extends MiniDFSCluster runID = UUID.randomUUID(); } + public Builder setRandomContainerPort(boolean randomPort) { + this.randomContainerPort = randomPort; + return this; + } + @Override public Builder numDataNodes(int val) { super.numDataNodes(val); @@ -319,6 +326,10 @@ public final class MiniOzoneCluster extends MiniDFSCluster conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0"); conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0"); + // Use random ports for ozone containers in mini cluster, + // in order to launch multiple container servers per node. + conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, + randomContainerPort); StorageContainerManager scm = new StorageContainerManager(conf); scm.start(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java new file mode 100644 index 00000000000..085cb8622f0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer; +import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.apache.hadoop.scm.ScmConfigKeys; +import org.apache.hadoop.scm.XceiverClient; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.test.PathUtils; +import org.apache.hadoop.test.TestGenericTestUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.junit.Assert.*; + +/** + * Test cases for mini ozone cluster. + */ +public class TestMiniOzoneCluster { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + + private final static File TEST_ROOT = TestGenericTestUtils.getTestDir(); + private final static File WRITE_TMP = new File(TEST_ROOT, "write"); + private final static File READ_TMP = new File(TEST_ROOT, "read"); + + @BeforeClass + public static void setup() { + conf = new OzoneConfiguration(); + WRITE_TMP.mkdirs(); + READ_TMP.mkdirs(); + WRITE_TMP.deleteOnExit(); + READ_TMP.deleteOnExit(); + } + + @AfterClass + public static void cleanup() { + if (cluster != null) { + cluster.shutdown(); + cluster.close(); + } + } + + @Test(timeout = 30000) + public void testStartMultipleDatanodes() throws Exception { + final int numberOfNodes = 3; + cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(numberOfNodes) + .setHandlerType("distributed").build(); + + // make sure datanode.id file is correct + File idPath = new File( + conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID)); + assertTrue(idPath.exists()); + List ids = ContainerUtils.readDatanodeIDsFrom(idPath); + assertEquals(numberOfNodes, ids.size()); + + List datanodes = cluster.getDataNodes(); + assertEquals(datanodes.size(), numberOfNodes); + for(DataNode dn : datanodes) { + // Each datanode ID should match an entry in the ID file + assertTrue("Datanode ID not found in ID file", + ids.contains(dn.getDatanodeId())); + + // Create a single member pipe line + String containerName = OzoneUtils.getRequestID(); + DatanodeID dnId = dn.getDatanodeId(); + Pipeline pipeline = new Pipeline(dnId.getDatanodeUuid()); + pipeline.addMember(dnId); + pipeline.setContainerName(containerName); + + // Verify client is able to connect to the container + try (XceiverClient client = new XceiverClient(pipeline, conf)){ + client.connect(); + assertTrue(client.isConnected()); + } + } + } + + @Test + public void testDatanodeIDPersistent() throws Exception { + // Generate IDs for testing + DatanodeID id1 = DFSTestUtil.getLocalDatanodeID(1); + DatanodeID id2 = DFSTestUtil.getLocalDatanodeID(2); + DatanodeID id3 = DFSTestUtil.getLocalDatanodeID(3); + id1.setContainerPort(1); + id2.setContainerPort(2); + id3.setContainerPort(3); + + // Write a single ID to the file and read it out + File validIdsFile = new File(WRITE_TMP, "valid-values.id"); + validIdsFile.delete(); + ContainerUtils.writeDatanodeIDTo(id1, validIdsFile); + List validIds = ContainerUtils + .readDatanodeIDsFrom(validIdsFile); + assertEquals(1, validIds.size()); + DatanodeID id11 = validIds.iterator().next(); + assertEquals(id11, id1); + assertEquals(id11.getProtoBufMessage(), id1.getProtoBufMessage()); + + // Write should avoid duplicate entries + File noDupIDFile = new File(WRITE_TMP, "no-dup-values.id"); + noDupIDFile.delete(); + ContainerUtils.writeDatanodeIDTo(id1, noDupIDFile); + ContainerUtils.writeDatanodeIDTo(id1, noDupIDFile); + ContainerUtils.writeDatanodeIDTo(id1, noDupIDFile); + ContainerUtils.writeDatanodeIDTo(id2, noDupIDFile); + ContainerUtils.writeDatanodeIDTo(id3, noDupIDFile); + + List noDupIDs =ContainerUtils + .readDatanodeIDsFrom(noDupIDFile); + assertEquals(3, noDupIDs.size()); + assertTrue(noDupIDs.contains(id1)); + assertTrue(noDupIDs.contains(id2)); + assertTrue(noDupIDs.contains(id3)); + + // Write should fail if unable to create file or directory + File invalidPath = new File(WRITE_TMP, "an/invalid/path"); + try { + ContainerUtils.writeDatanodeIDTo(id1, invalidPath); + } catch (Exception e) { + e.printStackTrace(); + assertTrue(e instanceof IOException); + } + + // Read should return an empty value if file doesn't exist + File nonExistFile = new File(READ_TMP, "non_exist.id"); + nonExistFile.delete(); + List emptyIDs = + ContainerUtils.readDatanodeIDsFrom(nonExistFile); + assertTrue(emptyIDs.isEmpty()); + + // Read should fail if the file is malformed + File malformedFile = new File(READ_TMP, "malformed.id"); + createMalformedIDFile(malformedFile); + try { + ContainerUtils.readDatanodeIDsFrom(malformedFile); + fail("Read a malformed ID file should fail"); + } catch (Exception e) { + assertTrue(e instanceof IOException); + } + } + + @Test + public void testContainerRandomPort() throws IOException { + Configuration ozoneConf = SCMTestUtils.getConf(); + File testDir = PathUtils.getTestDir(TestOzoneContainer.class); + ozoneConf.set(DFS_DATANODE_DATA_DIR_KEY, testDir.getAbsolutePath()); + + // Each instance of SM will create an ozone container + // that bounds to a random port. + ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, true); + try ( + DatanodeStateMachine sm1 = new DatanodeStateMachine(ozoneConf); + DatanodeStateMachine sm2 = new DatanodeStateMachine(ozoneConf); + DatanodeStateMachine sm3 = new DatanodeStateMachine(ozoneConf); + ) { + HashSet ports = new HashSet(); + assertTrue(ports.add(sm1.getContainer().getContainerServerPort())); + assertTrue(ports.add(sm2.getContainer().getContainerServerPort())); + assertTrue(ports.add(sm3.getContainer().getContainerServerPort())); + } + + // Turn off the random port flag and test again + ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false); + try ( + DatanodeStateMachine sm1 = new DatanodeStateMachine(ozoneConf); + DatanodeStateMachine sm2 = new DatanodeStateMachine(ozoneConf); + DatanodeStateMachine sm3 = new DatanodeStateMachine(ozoneConf); + ) { + HashSet ports = new HashSet(); + assertTrue(ports.add(sm1.getContainer().getContainerServerPort())); + assertFalse(ports.add(sm2.getContainer().getContainerServerPort())); + assertFalse(ports.add(sm3.getContainer().getContainerServerPort())); + assertEquals(ports.iterator().next().intValue(), + conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT, + OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT)); + } + } + + private void createMalformedIDFile(File malformedFile) + throws IOException{ + malformedFile.delete(); + DatanodeID id1 = DFSTestUtil.getLocalDatanodeID(1); + ContainerUtils.writeDatanodeIDTo(id1, malformedFile); + + FileOutputStream out = new FileOutputStream(malformedFile); + out.write("malformed".getBytes()); + out.close(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java index 9498baf1243..e8a1edc64c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java @@ -18,7 +18,10 @@ package org.apache.hadoop.ozone.container.common; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; @@ -51,6 +54,7 @@ import java.util.concurrent.TimeoutException; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import static org.apache.hadoop.scm.ScmConfigKeys .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT; +import static org.junit.Assert.assertTrue; /** * Tests the datanode state machine class and its states. @@ -134,14 +138,18 @@ public class TestDatanodeStateMachine { * @throws InterruptedException */ @Test - public void testDatanodeStateMachineStartThread() throws IOException, + public void testStartStopDatanodeStateMachine() throws IOException, InterruptedException, TimeoutException { try (DatanodeStateMachine stateMachine = - DatanodeStateMachine.initStateMachine(conf)) { + new DatanodeStateMachine(conf)) { + stateMachine.startDaemon(); SCMConnectionManager connectionManager = stateMachine.getConnectionManager(); GenericTestUtils.waitFor(() -> connectionManager.getValues().size() == 3, 1000, 30000); + + stateMachine.stopDaemon(); + assertTrue(stateMachine.isDaemonStopped()); } } @@ -178,6 +186,15 @@ public class TestDatanodeStateMachine { @Test public void testDatanodeStateContext() throws IOException, InterruptedException, ExecutionException, TimeoutException { + // There is no mini cluster started in this test, + // create a ID file so that state machine could load a fake datanode ID. + File idPath = new File( + conf.get(ScmConfigKeys.OZONE_SCM_DATANODE_ID)); + idPath.delete(); + DatanodeID dnID = DFSTestUtil.getLocalDatanodeID(); + dnID.setContainerPort(ScmConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); + ContainerUtils.writeDatanodeIDTo(dnID, idPath); + try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf)) { DatanodeStateMachine.DatanodeStates currentState = stateMachine.getContext().getState(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java index 11d57cef9cf..1a40b34f8a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java @@ -57,6 +57,7 @@ public class TestOzoneContainer { MiniOzoneCluster cluster = null; try { cluster = new MiniOzoneCluster.Builder(conf) + .setRandomContainerPort(false) .setHandlerType("distributed").build(); // We don't start Ozone Container via data node, we will do it // independently in our test path. @@ -108,6 +109,7 @@ public class TestOzoneContainer { pipeline.getLeader().getContainerPort()); cluster = new MiniOzoneCluster.Builder(conf) + .setRandomContainerPort(false) .setHandlerType("distributed").build(); // This client talks to ozone container via datanode. @@ -208,6 +210,7 @@ public class TestOzoneContainer { pipeline.getLeader().getContainerPort()); cluster = new MiniOzoneCluster.Builder(conf) + .setRandomContainerPort(false) .setHandlerType("distributed").build(); // This client talks to ozone container via datanode. @@ -273,6 +276,7 @@ public class TestOzoneContainer { pipeline.getLeader().getContainerPort()); cluster = new MiniOzoneCluster.Builder(conf) + .setRandomContainerPort(false) .setHandlerType("distributed").build(); // This client talks to ozone container via datanode. @@ -364,5 +368,4 @@ public class TestOzoneContainer { } } } - }