From d10f39e751ab43d7dc60c1ad86ea5cb80d5d095e Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Wed, 16 Nov 2016 13:44:24 -0800 Subject: [PATCH] HDFS-11081. Ozone:SCM: Add support for registerNode in datanode. Contributed by Anu Engineer. --- .../hadoop/hdfs/server/datanode/DataNode.java | 2 +- .../apache/hadoop/ozone/OzoneClientUtils.java | 165 ++++++--- .../apache/hadoop/ozone/OzoneConfigKeys.java | 52 ++- .../statemachine/DatanodeStateMachine.java | 213 ++++++++++++ .../statemachine/EndpointStateMachine.java | 265 +++++++++++++++ .../statemachine/SCMConnectionManager.java | 174 ++++++++++ .../common/statemachine/StateContext.java | 191 +++++++++++ .../common/statemachine/package-info.java | 28 ++ .../common/states/DatanodeState.java | 55 +++ .../states/datanode/InitDatanodeState.java | 135 ++++++++ .../states/datanode/RunningDatanodeState.java | 297 +++++++++++++++++ .../common/states/datanode/package-info.java | 21 ++ .../endpoint/HeartbeatEndpointTask.java | 181 ++++++++++ .../states/endpoint/RegisterEndpointTask.java | 198 +++++++++++ .../states/endpoint/VersionEndpointTask.java | 66 ++++ .../common/states/endpoint/package-info.java | 20 ++ .../container/common/states/package-info.java | 18 + .../container/ozoneimpl/OzoneContainer.java | 4 +- .../StorageContainerDatanodeProtocol.java | 58 ++++ .../ozone/protocol/commands/NullCommand.java | 2 +- .../protocol/commands/RegisteredCommand.java | 27 +- .../ozone/protocol/commands/SCMCommand.java | 2 +- ...atanodeProtocolClientSideTranslatorPB.java | 154 +++++++++ .../StorageContainerDatanodeProtocolPB.java | 32 ++ ...atanodeProtocolServerSideTranslatorPB.java | 86 +++++ .../apache/hadoop/ozone/scm/VersionInfo.java | 2 + .../hadoop/ozone/scm/node/SCMNodeManager.java | 37 +-- .../StorageContainerDatanodeProtocol.proto | 70 ++-- .../ozone/container/common/SCMTestUtils.java | 188 +++++++++++ .../ozone/container/common/ScmTestMock.java | 149 +++++++++ .../common/TestDatanodeStateMachine.java | 274 +++++++++++++++ .../ozone/container/common/TestEndPoint.java | 314 ++++++++++++++++++ .../ozone/scm/node/TestNodeManager.java | 85 +---- 33 files changed, 3383 insertions(+), 182 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolPB.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index c1aefe9bfd4..eab19563b8d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -873,7 +873,7 @@ public class DataNode extends ReconfigurableBase * @throws UnknownHostException if the dfs.datanode.dns.interface * option is used and the hostname can not be determined */ - private static String getHostName(Configuration config) + public static String getHostName(Configuration config) throws UnknownHostException { String name = config.get(DFS_DATANODE_HOST_NAME_KEY); if (name == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java index 549dc804c54..5d1aed82de2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java @@ -1,43 +1,61 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. */ package org.apache.hadoop.ozone; import com.google.common.base.Optional; +import com.google.common.net.HostAndPort; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.scm.ScmConfigKeys; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.apache.hadoop.ozone.OzoneConfigKeys.*; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_MS; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERVAL_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERVAL_MS; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_DEADNODE_INTERVAL_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_DEADNODE_INTERVAL_MS; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS; + +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS; + +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_STALENODE_INTERVAL_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_SCM_STALENODE_INTERVAL_MS; /** * Utility methods for Ozone and Container Clients. @@ -51,6 +69,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERV public final class OzoneClientUtils { private static final Logger LOG = LoggerFactory.getLogger( OzoneClientUtils.class); + private static final int NO_PORT = -1; /** * The service ID of the solitary Ozone SCM service. @@ -139,7 +158,7 @@ public final class OzoneClientUtils { return NetUtils.createSocketAddr( host.or(OZONE_SCM_CLIENT_BIND_HOST_DEFAULT) + ":" + - port.or(OZONE_SCM_CLIENT_PORT_DEFAULT)); + port.or(OZONE_SCM_CLIENT_PORT_DEFAULT)); } /** @@ -160,7 +179,7 @@ public final class OzoneClientUtils { return NetUtils.createSocketAddr( host.or(OZONE_SCM_DATANODE_BIND_HOST_DEFAULT) + ":" + - port.or(OZONE_SCM_DATANODE_PORT_DEFAULT)); + port.or(OZONE_SCM_DATANODE_PORT_DEFAULT)); } /** @@ -168,7 +187,7 @@ public final class OzoneClientUtils { * Each config value may be absent, or if present in the format * host:port (the :port part is optional). * - * @param conf + * @param conf - Conf * @param keys a list of configuration key names. * * @return first hostname component found from the given keys, or absent. @@ -176,51 +195,65 @@ public final class OzoneClientUtils { * or host:port format. */ static Optional getHostNameFromConfigKeys( - Configuration conf, String ... keys) { + Configuration conf, String... keys) { for (final String key : keys) { final String value = conf.getTrimmed(key); - if (value != null && !value.isEmpty()) { - String[] splits = value.split(":"); - - if(splits.length < 1 || splits.length > 2) { - throw new IllegalArgumentException( - "Invalid value " + value + " for config key " + key + - ". It should be in 'host' or 'host:port' format"); - } - return Optional.of(splits[0]); + final Optional hostName = getHostName(value); + if (hostName.isPresent()) { + return hostName; } } return Optional.absent(); } + /** + * Gets the hostname or Indicates that it is absent. + * @param value host or host:port + * @return hostname + */ + public static Optional getHostName(String value) { + if ((value == null) || value.isEmpty()) { + return Optional.absent(); + } + return Optional.of(HostAndPort.fromString(value).getHostText()); + } + + /** + * Gets the port if there is one, throws otherwise. + * @param value String in host:port format. + * @return Port + */ + public static Optional getHostPort(String value) { + if((value == null) || value.isEmpty()) { + return Optional.absent(); + } + int port = HostAndPort.fromString(value).getPortOrDefault(NO_PORT); + if (port == NO_PORT) { + return Optional.absent(); + } else { + return Optional.of(port); + } + } + /** * Retrieve the port number, trying the supplied config keys in order. * Each config value may be absent, or if present in the format * host:port (the :port part is optional). * - * @param conf + * @param conf Conf * @param keys a list of configuration key names. * * @return first port number component found from the given keys, or absent. * @throws IllegalArgumentException if any values are not in the 'host' * or host:port format. */ - static Optional getPortNumberFromConfigKeys( - Configuration conf, String ... keys) { + public static Optional getPortNumberFromConfigKeys( + Configuration conf, String... keys) { for (final String key : keys) { final String value = conf.getTrimmed(key); - if (value != null && !value.isEmpty()) { - String[] splits = value.split(":"); - - if(splits.length < 1 || splits.length > 2) { - throw new IllegalArgumentException( - "Invalid value " + value + " for config key " + key + - ". It should be in 'host' or 'host:port' format"); - } - - if (splits.length == 2) { - return Optional.of(Integer.parseInt(splits[1])); - } + final Optional hostPort = getHostPort(value); + if (hostPort.isPresent()) { + return hostPort; } } return Optional.absent(); @@ -259,7 +292,7 @@ public final class OzoneClientUtils { * @return long */ private static long sanitizeUserArgs(long valueTocheck, long baseValue, - long minFactor, long maxFactor) + long minFactor, long maxFactor) throws IllegalArgumentException { if ((valueTocheck >= (baseValue * minFactor)) && (valueTocheck <= (baseValue * maxFactor))) { @@ -270,7 +303,6 @@ public final class OzoneClientUtils { throw new IllegalArgumentException(errMsg); } - /** * Returns the interval in which the heartbeat processor thread runs. * @@ -282,7 +314,6 @@ public final class OzoneClientUtils { OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS_DEFAULT); } - /** * Heartbeat Interval - Defines the heartbeat frequency from a datanode to * SCM. @@ -295,7 +326,6 @@ public final class OzoneClientUtils { OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT); } - /** * Get the Stale Node interval, which is used by SCM to flag a datanode as * stale, if the heartbeat from that node has been missing for this duration. @@ -340,7 +370,6 @@ public final class OzoneClientUtils { return staleNodeIntevalMs; } - /** * Gets the interval for dead node flagging. This has to be a value that is * greater than stale node value, and by transitive relation we also know @@ -374,8 +403,42 @@ public final class OzoneClientUtils { * @param conf Configration * @return - int -- Number of HBs to process */ - public static int getMaxHBToProcessPerLoop(Configuration conf){ + public static int getMaxHBToProcessPerLoop(Configuration conf) { return conf.getInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT); } + + /** + * Timeout value for the RPC from Datanode to SCM, primarily used for + * Heartbeats and container reports. + * + * @param conf - Ozone Config + * @return - Rpc timeout in Milliseconds. + */ + public static long getScmRpcTimeOutInMilliseconds(Configuration conf) { + return conf.getTimeDuration(OZONE_SCM_HEARTBEAT_RPC_TIMEOUT, + OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); + } + + /** + * Log Warn interval. + * + * @param conf - Ozone Config + * @return - Log warn interval. + */ + public static int getLogWarnInterval(Configuration conf) { + return conf.getInt(OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT, + OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT); + } + + /** + * returns the Container port. + * @param conf - Conf + * @return port number. + */ + public static int getContainerPort(Configuration conf) { + return conf.getInt(ScmConfigKeys.DFS_CONTAINER_IPC_PORT, ScmConfigKeys + .DFS_CONTAINER_IPC_PORT_DEFAULT); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index ec133aa4b62..746fefea40b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -43,8 +43,8 @@ public final class OzoneConfigKeys { "ozone.trace.enabled"; public static final boolean OZONE_TRACE_ENABLED_DEFAULT = false; - public static final String OZONE_METADATA_DIRS = - "ozone.metadata.dirs"; + public static final String OZONE_CONTAINER_METADATA_DIRS = + "ozone.container.metadata.dirs"; public static final String OZONE_KEY_CACHE = "ozone.key.cache.size"; public static final int OZONE_KEY_CACHE_DEFAULT = 1024; @@ -94,6 +94,54 @@ public final class OzoneConfigKeys { public static final long OZONE_SCM_STALENODE_INTERVAL_DEFAULT = OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT * 1000L * 3L; + public static final String OZONE_SCM_CONTAINER_THREADS = + "ozone.scm.container.threads"; + public static final int OZONE_SCM_CONTAINER_THREADS_DEFAULT = + Runtime.getRuntime().availableProcessors() * 2; + + public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT = + "ozone.scm.heartbeat.rpc-timeout"; + public static final long OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT = + 100; + + /** + * Defines how frequently we will log the missing of heartbeat to a specific + * SCM. In the default case we will write a warning message for each 10 + * sequential heart beats that we miss to a specific SCM. This is to avoid + * overrunning the log with lots of HB missed Log statements. + */ + public static final String OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT = + "ozone.scm.heartbeat.log.warn.interval.count"; + public static final int OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT = + 10; + + public static final String OZONE_CONTAINER_TASK_WAIT = + "ozone.container.task.wait.seconds"; + public static final long OZONE_CONTAINER_TASK_WAIT_DEFAULT = 5; + + + // ozone.scm.names key is a set of DNS | DNS:PORT | IP Address | IP:PORT. + // Written as a comma separated string. e.g. scm1, scm2:8020, 7.7.7.7:7777 + // + // If this key is not specified datanodes will not be able to find + // SCM. The SCM membership can be dynamic, so this key should contain + // all possible SCM names. Once the SCM leader is discovered datanodes will + // get the right list of SCMs to heartbeat to from the leader. + // While it is good for the datanodes to know the names of all SCM nodes, + // it is sufficient to actually know the name of on working SCM. That SCM + // will be able to return the information about other SCMs that are part of + // the SCM replicated Log. + // + //In case of a membership change, any one of the SCM machines will be + // able to send back a new list to the datanodes. + public static final String OZONE_SCM_NAMES = "ozone.scm.names"; + + public static final int OZONE_SCM_DEFAULT_PORT = 9862; + // File Name and path where datanode ID is to written to. + // if this value is not set then container startup will fail. + public static final String OZONE_SCM_DATANODE_ID = "ozone.scm.datanode.id"; + + /** * There is no need to instantiate this class. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java new file mode 100644 index 00000000000..5574c9fc838 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * State Machine Class. + */ +public class DatanodeStateMachine implements Closeable { + @VisibleForTesting + static final Logger LOG = + LoggerFactory.getLogger(DatanodeStateMachine.class); + private final ExecutorService executorService; + private final Configuration conf; + private final SCMConnectionManager connectionManager; + private final long taskWaitTime; + private final long heartbeatFrequency; + private StateContext context; + + /** + * Constructs a container state machine. + * + * @param conf - Configration. + */ + public DatanodeStateMachine(Configuration conf) { + this.conf = conf; + executorService = HadoopExecutors.newScheduledThreadPool( + this.conf.getInt(OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS, + OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS_DEFAULT), + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Container State Machine Thread - %d").build()); + connectionManager = new SCMConnectionManager(conf); + context = new StateContext(this.conf, DatanodeStates.getInitState(), this); + taskWaitTime = this.conf.getLong(OzoneConfigKeys.OZONE_CONTAINER_TASK_WAIT, + OzoneConfigKeys.OZONE_CONTAINER_TASK_WAIT_DEFAULT); + heartbeatFrequency = OzoneClientUtils.getScmHeartbeatInterval(conf); + } + + /** + * Returns the Connection manager for this state machine. + * + * @return - SCMConnectionManager. + */ + public SCMConnectionManager getConnectionManager() { + return connectionManager; + } + + /** + * Runs the state machine at a fixed frequency. + */ + public void start() throws IOException { + long now = 0; + long nextHB = 0; + while (context.getState() != DatanodeStates.SHUTDOWN) { + try { + nextHB = Time.monotonicNow() + heartbeatFrequency; + context.execute(executorService, taskWaitTime, TimeUnit.SECONDS); + now = Time.monotonicNow(); + if (now < nextHB) { + Thread.sleep(nextHB - now); + } + } catch (InterruptedException | ExecutionException | TimeoutException e) { + LOG.error("Unable to finish the execution", e); + } + } + } + + /** + * Gets the current context. + * + * @return StateContext + */ + public StateContext getContext() { + return context; + } + + /** + * Sets the current context. + * + * @param context - Context + */ + public void setContext(StateContext context) { + this.context = context; + } + + /** + * Closes this stream and releases any system resources associated with it. If + * the stream is already closed then invoking this method has no effect. + *

+ *

As noted in {@link AutoCloseable#close()}, cases where the close may + * fail require careful attention. It is strongly advised to relinquish the + * underlying resources and to internally mark the {@code Closeable} + * as closed, prior to throwing the {@code IOException}. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + LOG.error("Unable to shutdown statemachine properly."); + } + } catch (InterruptedException e) { + LOG.error("Error attempting to shutdown.", e); + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + + for (EndpointStateMachine endPoint : connectionManager.getValues()) { + endPoint.close(); + } + } + + /** + * States that a datanode can be in. GetNextState will move this enum from + * getInitState to getLastState. + */ + public enum DatanodeStates { + INIT(1), + RUNNING(2), + SHUTDOWN(3); + private final int value; + + /** + * Constructs ContainerStates. + * + * @param value + */ + DatanodeStates(int value) { + this.value = value; + } + + /** + * Returns the first State. + * + * @return First State. + */ + public static DatanodeStates getInitState() { + return INIT; + } + + /** + * The last state of endpoint states. + * + * @return last state. + */ + public static DatanodeStates getLastState() { + return SHUTDOWN; + } + + /** + * returns the numeric value associated with the endPoint. + * + * @return int. + */ + public int getValue() { + return value; + } + + /** + * Returns the next logical state that endPoint should move to. This + * function assumes the States are sequentially numbered. + * + * @return NextState. + */ + public DatanodeStates getNextState() { + if (this.value < getLastState().getValue()) { + int stateValue = this.getValue() + 1; + for (DatanodeStates iter : values()) { + if (stateValue == iter.getValue()) { + return iter; + } + } + } + return getLastState(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java new file mode 100644 index 00000000000..2900a55355f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.ozone.protocol.VersionResponse; +import org.apache.hadoop.ozone.protocolPB + .StorageContainerDatanodeProtocolClientSideTranslatorPB; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Endpoint is used as holder class that keeps state around the RPC endpoint. + */ +public class EndpointStateMachine implements Closeable { + static final Logger + LOG = LoggerFactory.getLogger(EndpointStateMachine.class); + private final StorageContainerDatanodeProtocolClientSideTranslatorPB endPoint; + private final AtomicLong missedCount; + private final InetSocketAddress address; + private final Lock lock; + private final Configuration conf; + private EndPointStates state; + private VersionResponse version; + + /** + * Constructs RPC Endpoints. + * + * @param endPoint - RPC endPoint. + */ + public EndpointStateMachine(InetSocketAddress address, + StorageContainerDatanodeProtocolClientSideTranslatorPB endPoint, + Configuration conf) { + this.endPoint = endPoint; + this.missedCount = new AtomicLong(0); + this.address = address; + state = EndPointStates.getInitState(); + lock = new ReentrantLock(); + this.conf = conf; + } + + /** + * Takes a lock on this EndPoint so that other threads don't use this while we + * are trying to communicate via this endpoint. + */ + public void lock() { + lock.lock(); + } + + /** + * Unlocks this endpoint. + */ + public void unlock() { + lock.unlock(); + } + + /** + * Returns the version that we read from the server if anyone asks . + * + * @return - Version Response. + */ + public VersionResponse getVersion() { + return version; + } + + /** + * Sets the Version reponse we recieved from the SCM. + * + * @param version VersionResponse + */ + public void setVersion(VersionResponse version) { + this.version = version; + } + + /** + * Returns the current State this end point is in. + * + * @return - getState. + */ + public EndPointStates getState() { + return state; + } + + /** + * Sets the endpoint state. + * + * @param state - state. + */ + public EndPointStates setState(EndPointStates state) { + this.state = state; + return this.state; + } + + /** + * Closes the connection. + * + * @throws IOException + */ + @Override + public void close() throws IOException { + if (endPoint != null) { + endPoint.close(); + } + } + + /** + * We maintain a count of how many times we missed communicating with a + * specific SCM. This is not made atomic since the access to this is always + * guarded by the read or write lock. That is, it is serialized. + */ + public void incMissed() { + this.missedCount.incrementAndGet(); + } + + /** + * Returns the value of the missed count. + * + * @return int + */ + public long getMissedCount() { + return this.missedCount.get(); + } + + public void zeroMissedCount() { + this.missedCount.set(0); + } + + /** + * Returns the InetAddress of the endPoint. + * + * @return - EndPoint. + */ + public InetSocketAddress getAddress() { + return this.address; + } + + /** + * Returns real RPC endPoint. + * + * @return rpc client. + */ + public StorageContainerDatanodeProtocolClientSideTranslatorPB + getEndPoint() { + return endPoint; + } + + /** + * Returns the string that represents this endpoint. + * + * @return - String + */ + public String toString() { + return address.toString(); + } + + /** + * Logs exception if needed. + * @param ex - Exception + */ + public void logIfNeeded(Exception ex) { + LOG.trace("Incrementing the Missed count. Ex : {}", ex); + this.incMissed(); + if (this.getMissedCount() % OzoneClientUtils.getLogWarnInterval(conf) == + 0) { + LOG.warn("Unable to communicate to SCM server at {}. We have not been " + + "able to communicate to this SCM server for past {} seconds.", + this.getAddress().getHostString() + ":" + this.getAddress().getPort(), + this.getMissedCount() * OzoneClientUtils.getScmHeartbeatInterval( + this.conf)); + } + } + + + /** + * States that an Endpoint can be in. + *

+ * This is a sorted list of states that EndPoint will traverse. + *

+ * GetNextState will move this enum from getInitState to getLastState. + */ + public enum EndPointStates { + GETVERSION(1), + REGISTER(2), + HEARTBEAT(3), + SHUTDOWN(4); // if you add value after this please edit getLastState too. + private final int value; + + /** + * Constructs endPointStates. + * + * @param value state. + */ + EndPointStates(int value) { + this.value = value; + } + + /** + * Returns the first State. + * + * @return First State. + */ + public static EndPointStates getInitState() { + return GETVERSION; + } + + /** + * The last state of endpoint states. + * + * @return last state. + */ + public static EndPointStates getLastState() { + return SHUTDOWN; + } + + /** + * returns the numeric value associated with the endPoint. + * + * @return int. + */ + public int getValue() { + return value; + } + + /** + * Returns the next logical state that endPoint should move to. + * The next state is computed by adding 1 to the current state. + * + * @return NextState. + */ + public EndPointStates getNextState() { + if (this.getValue() < getLastState().getValue()) { + int stateValue = this.getValue() + 1; + for (EndPointStates iter : values()) { + if (stateValue == iter.getValue()) { + return iter; + } + } + } + return getLastState(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java new file mode 100644 index 00000000000..33d361b8d7d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.ozone.protocolPB + .StorageContainerDatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * SCMConnectionManager - Acts as a class that manages the membership + * information of the SCMs that we are working with. + */ +public class SCMConnectionManager { + private static final Logger LOG = + LoggerFactory.getLogger(SCMConnectionManager.class); + + private final ReadWriteLock mapLock; + private final Map scmMachines; + + private final int rpcTimeout; + private final Configuration conf; + + + public SCMConnectionManager(Configuration conf) { + this.mapLock = new ReentrantReadWriteLock(); + Long timeOut = OzoneClientUtils.getScmRpcTimeOutInMilliseconds(conf); + this.rpcTimeout = timeOut.intValue(); + this.scmMachines = new HashMap<>(); + this.conf = conf; + } + + /** + * Returns Config. + * + * @return ozoneConfig. + */ + public Configuration getConf() { + return conf; + } + + /** + * Get RpcTimeout. + * + * @return - Return RPC timeout. + */ + public long getRpcTimeout() { + return rpcTimeout; + } + + + /** + * Takes a read lock. + */ + public void readLock() { + this.mapLock.readLock().lock(); + } + + /** + * Releases the read lock. + */ + public void readUnlock() { + this.mapLock.readLock().unlock(); + } + + /** + * Takes the write lock. + */ + public void writeLock() { + this.mapLock.writeLock().lock(); + } + + /** + * Releases the write lock. + */ + public void writeUnlock() { + this.mapLock.writeLock().unlock(); + } + + /** + * adds a new SCM machine to the target set. + * + * @param address - Address of the SCM machine to send heatbeat to. + * @throws IOException + */ + public void addSCMServer(InetSocketAddress address) throws IOException { + writeLock(); + try { + if (scmMachines.containsKey(address)) { + LOG.warn("Trying to add an existing SCM Machine to Machines group. " + + "Ignoring the request."); + return; + } + RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class, + ProtobufRpcEngine.class); + long version = + RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class); + + StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProxy( + StorageContainerDatanodeProtocolPB.class, version, + address, UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), rpcTimeout); + + StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient = + new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy); + EndpointStateMachine endPoint = + new EndpointStateMachine(address, rpcClient, conf); + scmMachines.put(address, endPoint); + } finally { + writeUnlock(); + } + } + + /** + * Removes a SCM machine for the target set. + * + * @param address - Address of the SCM machine to send heatbeat to. + * @throws IOException + */ + public void removeSCMServer(InetSocketAddress address) throws IOException { + writeLock(); + try { + if (!scmMachines.containsKey(address)) { + LOG.warn("Trying to remove a non-existent SCM machine. " + + "Ignoring the request."); + return; + } + + EndpointStateMachine endPoint = scmMachines.get(address); + endPoint.close(); + scmMachines.remove(address); + } finally { + writeUnlock(); + } + } + + /** + * Returns all known RPCEndpoints. + * + * @return - List of RPC Endpoints. + */ + public Collection getValues() { + return scmMachines.values(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java new file mode 100644 index 00000000000..0a20945e457 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState; +import org.apache.hadoop.ozone.container.common.states.DatanodeState; +import org.apache.hadoop.ozone.container.common.states.datanode + .RunningDatanodeState; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Current Context of State Machine. + */ +public class StateContext { + private final Queue commandQueue; + private final Lock lock; + private final DatanodeStateMachine parent; + private final AtomicLong stateExecutionCount; + private final Configuration conf; + private DatanodeStateMachine.DatanodeStates state; + + /** + * Constructs a StateContext. + * + * @param conf - Configration + * @param state - State + * @param parent Parent State Machine + */ + public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates + state, DatanodeStateMachine parent) { + this.conf = conf; + this.state = state; + this.parent = parent; + commandQueue = new LinkedList<>(); + lock = new ReentrantLock(); + stateExecutionCount = new AtomicLong(0); + } + + /** + * Returns the ContainerStateMachine class that holds this state. + * + * @return ContainerStateMachine. + */ + public DatanodeStateMachine getParent() { + return parent; + } + + /** + * Returns true if we are entering a new state. + * + * @return boolean + */ + boolean isEntering() { + return stateExecutionCount.get() == 0; + } + + /** + * Returns true if we are exiting from the current state. + * + * @param newState - newState. + * @return boolean + */ + boolean isExiting(DatanodeStateMachine.DatanodeStates newState) { + boolean isExiting = state != newState && stateExecutionCount.get() > 0; + if(isExiting) { + stateExecutionCount.set(0); + } + return isExiting; + } + + /** + * Returns the current state the machine is in. + * + * @return state. + */ + public DatanodeStateMachine.DatanodeStates getState() { + return state; + } + + /** + * Sets the current state of the machine. + * + * @param state state. + */ + public void setState(DatanodeStateMachine.DatanodeStates state) { + this.state = state; + } + + /** + * Returns the next task to get executed by the datanode state machine. + * @return A callable that will be executed by the + * {@link DatanodeStateMachine} + */ + @SuppressWarnings("unchecked") + public DatanodeState getTask() { + switch (this.state) { + case INIT: + return new InitDatanodeState(this.conf, parent.getConnectionManager(), + this); + case RUNNING: + return new RunningDatanodeState(this.conf, parent.getConnectionManager(), + this); + case SHUTDOWN: + return null; + default: + throw new IllegalArgumentException("Not Implemented yet."); + } + } + + /** + * Executes the required state function. + * + * @param service - Executor Service + * @param time - seconds to wait + * @param unit - Seconds. + * @throws InterruptedException + * @throws ExecutionException + * @throws TimeoutException + */ + public void execute(ExecutorService service, long time, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + stateExecutionCount.incrementAndGet(); + DatanodeState task = getTask(); + if (this.isEntering()) { + task.onEnter(); + } + task.execute(service); + DatanodeStateMachine.DatanodeStates newState = task.await(time, unit); + if (this.state != newState) { + if (isExiting(newState)) { + task.onExit(); + } + this.setState(newState); + } + } + + /** + * Returns the next command or null if it is empty. + * + * @return SCMCommand or Null. + */ + public SCMCommand getNextCommand() { + lock.lock(); + try { + return commandQueue.poll(); + } finally { + lock.unlock(); + } + } + + /** + * Adds a command to the State Machine queue. + * + * @param command - SCMCommand. + */ + public void addCommand(SCMCommand command) { + lock.lock(); + try { + commandQueue.add(command); + } finally { + lock.unlock(); + } + } + + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java new file mode 100644 index 00000000000..feb2f812ac8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine; +/** + + State machine class is used by the container to denote various states a + container can be in and also is used for command processing. + + Container has the following states. + + Start - > getVersion -> Register -> Running -> Shutdown + + */ \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java new file mode 100644 index 00000000000..75142afd10c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.states; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * State Interface that allows tasks to maintain states. + */ +public interface DatanodeState { + /** + * Called before entering this state. + */ + void onEnter(); + + /** + * Called After exiting this state. + */ + void onExit(); + + /** + * Executes one or more tasks that is needed by this state. + * + * @param executor - ExecutorService + */ + void execute(ExecutorService executor); + + /** + * Wait for execute to finish. + * + * @param time - Time + * @param timeUnit - Unit of time. + */ + T await(long time, TimeUnit timeUnit) + throws InterruptedException, ExecutionException, TimeoutException; + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java new file mode 100644 index 00000000000..233cac19348 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.datanode; + +import com.google.common.base.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.states.DatanodeState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Init Datanode State is the task that gets run when we are in Init State. + */ +public class InitDatanodeState implements DatanodeState, + Callable { + static final Logger LOG = LoggerFactory.getLogger(InitDatanodeState.class); + private final SCMConnectionManager connectionManager; + private final Configuration conf; + private final StateContext context; + private Future result; + + /** + * Create InitDatanodeState Task. + * + * @param conf - Conf + * @param connectionManager - Connection Manager + * @param context - Current Context + */ + public InitDatanodeState(Configuration conf, + SCMConnectionManager connectionManager, + StateContext context) { + this.conf = conf; + this.connectionManager = connectionManager; + this.context = context; + } + + /** + * Computes a result, or throws an exception if unable to do so. + * + * @return computed result + * @throws Exception if unable to compute a result + */ + @Override + public DatanodeStateMachine.DatanodeStates call() throws Exception { + String[] addresses = conf.getStrings(OzoneConfigKeys.OZONE_SCM_NAMES); + final Optional defaultPort = Optional.of(OzoneConfigKeys + .OZONE_SCM_DEFAULT_PORT); + + if (addresses == null || addresses.length <= 0) { + LOG.error("SCM addresses need to be a set of valid DNS names " + + "or IP addresses. Null or empty address list found. Aborting " + + "containers."); + return DatanodeStateMachine.DatanodeStates.SHUTDOWN; + } + for (String address : addresses) { + Optional hostname = OzoneClientUtils.getHostName(address); + if (!hostname.isPresent()) { + LOG.error("Invalid hostname for SCM."); + return DatanodeStateMachine.DatanodeStates.SHUTDOWN; + } + Optional port = OzoneClientUtils.getHostPort(address); + InetSocketAddress addr = NetUtils.createSocketAddr(hostname.get(), + port.or(defaultPort.get())); + connectionManager.addSCMServer(addr); + } + return this.context.getState().getNextState(); + } + + /** + * Called before entering this state. + */ + @Override + public void onEnter() { + LOG.trace("Entering init container state"); + } + + /** + * Called After exiting this state. + */ + @Override + public void onExit() { + LOG.trace("Exiting init container state"); + } + + /** + * Executes one or more tasks that is needed by this state. + * + * @param executor - ExecutorService + */ + @Override + public void execute(ExecutorService executor) { + result = executor.submit(this); + } + + /** + * Wait for execute to finish. + * + * @param time - Time + * @param timeUnit - Unit of time. + */ + @Override + public DatanodeStateMachine.DatanodeStates await(long time, + TimeUnit timeUnit) throws InterruptedException, + ExecutionException, TimeoutException { + return result.get(time, timeUnit); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java new file mode 100644 index 00000000000..69eabe658d9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.datanode; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.ozone.OzoneClientUtils; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.statemachine + .DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine + .SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.states.DatanodeState; +import org.apache.hadoop.ozone.container.common.states.endpoint + .HeartbeatEndpointTask; +import org.apache.hadoop.ozone.container.common.states.endpoint + .RegisterEndpointTask; +import org.apache.hadoop.ozone.container.common.states.endpoint + .VersionEndpointTask; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Class that implements handshake with SCM. + */ +public class RunningDatanodeState implements DatanodeState { + static final Logger + LOG = LoggerFactory.getLogger(RunningDatanodeState.class); + private final SCMConnectionManager connectionManager; + private final Configuration conf; + private final StateContext context; + private CompletionService ecs; + + public RunningDatanodeState(Configuration conf, + SCMConnectionManager connectionManager, + StateContext context) { + this.connectionManager = connectionManager; + this.conf = conf; + this.context = context; + } + + /** + * Reads a datanode ID from the persisted information. + * + * @param idPath - Path to the ID File. + * @return DatanodeID + * @throws IOException + */ + private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto + readPersistedDatanodeID(Path idPath) throws IOException { + Preconditions.checkNotNull(idPath); + StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto + containerIDProto; + try (FileInputStream stream = new FileInputStream(idPath.toFile())) { + containerIDProto = StorageContainerDatanodeProtocolProtos + .ContainerNodeIDProto.parseFrom(stream); + return containerIDProto; + } + } + + /** + * Create a DatanodeID from the datanode information. + * + * @return DatanodeID + * @throws UnknownHostException + */ + private DatanodeID createDatanodeID() throws UnknownHostException { + DatanodeID temp = new DatanodeID( + //TODO : Replace this with proper network and kerberos + // support code. + InetAddress.getLocalHost().getHostAddress().toString(), + DataNode.getHostName(conf), + UUID.randomUUID().toString(), + 0, /** XferPort - SCM does not use this port */ + 0, /** Info port - SCM does not use this port */ + 0, /** Info Secure Port - SCM does not use this port */ + 0); /** IPC port - SCM does not use this port */ + + // TODO: make this dynamically discoverable. SCM can hand out this + // port number to calling applications. This makes it easy to run multiple + // container endpoints on the same machine. + temp.setContainerPort(OzoneClientUtils.getContainerPort(conf)); + return temp; + } + + /** + * Creates a new ContainerID that persists both DatanodeID and ClusterID. + * + * @param idPath Path to the id file. + * @return ContainerNodeIDProto + * @throws UnknownHostException + */ + private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto + createNewContainerID(Path idPath) + throws IOException { + StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto + containerIDProto = StorageContainerDatanodeProtocolProtos + .ContainerNodeIDProto.newBuilder() + .setDatanodeID(createDatanodeID().getProtoBufMessage()).build(); + try (FileOutputStream stream = new FileOutputStream(idPath.toFile())) { + stream.write(containerIDProto.toByteArray()); + return containerIDProto; + } + } + + /** + * Returns ContainerNodeIDProto or null in case of Error. + * + * @return ContainerNodeIDProto + */ + private StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto + getContainerNodeID() { + String dataNodeIDPath = conf.get(OzoneConfigKeys.OZONE_SCM_DATANODE_ID); + if (dataNodeIDPath == null || dataNodeIDPath.isEmpty()) { + LOG.error("A valid file path is needed for config setting {}", + OzoneConfigKeys.OZONE_SCM_DATANODE_ID); + + // This is an unrecoverable error. + this.context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN); + return null; + } + StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto nodeID; + // try to read an existing ContainerNode ID. + try { + nodeID = readPersistedDatanodeID(Paths.get(dataNodeIDPath)); + if (nodeID != null) { + LOG.trace("Read Node ID :", nodeID.getDatanodeID().getDatanodeUuid()); + return nodeID; + } + } catch (IOException ex) { + LOG.trace("Not able to find container Node ID, creating it.", ex); + } + // Not found, let us create a new datanode ID, persist it and return that + // info to SCM. + try { + nodeID = createNewContainerID(Paths.get(dataNodeIDPath)); + LOG.trace("Created Node ID :", nodeID.getDatanodeID().getDatanodeUuid()); + return nodeID; + } catch (IOException ex) { + LOG.error("Creating new node ID failed.", ex); + } + return null; + } + + /** + * Called before entering this state. + */ + @Override + public void onEnter() { + LOG.trace("Entering handshake task."); + } + + /** + * Called After exiting this state. + */ + @Override + public void onExit() { + LOG.trace("Exiting handshake task."); + } + + /** + * Executes one or more tasks that is needed by this state. + * + * @param executor - ExecutorService + */ + @Override + public void execute(ExecutorService executor) { + ecs = new ExecutorCompletionService<>(executor); + for (EndpointStateMachine endpoint : connectionManager.getValues()) { + Callable endpointTask + = getEndPointTask(endpoint); + ecs.submit(endpointTask); + } + } + + private Callable + getEndPointTask(EndpointStateMachine endpoint) { + switch (endpoint.getState()) { + case GETVERSION: + return new VersionEndpointTask(endpoint, conf); + case REGISTER: + return RegisterEndpointTask.newBuilder() + .setConfig(conf) + .setEndpointStateMachine(endpoint) + .setNodeID(getContainerNodeID()) + .build(); + case HEARTBEAT: + return HeartbeatEndpointTask.newBuilder() + .setConfig(conf) + .setEndpointStateMachine(endpoint) + .setNodeID(getContainerNodeID()) + .build(); + case SHUTDOWN: + break; + default: + throw new IllegalArgumentException("Illegal Argument."); + } + return null; + } + + /** + * Computes the next state the container state machine must move to by looking + * at all the state of endpoints. + *

+ * if any endpoint state has moved to Shutdown, either we have an + * unrecoverable error or we have been told to shutdown. Either case the + * datanode state machine should move to Shutdown state, otherwise we + * remain in the Running state. + * + * @return next container state. + */ + private DatanodeStateMachine.DatanodeStates + computeNextContainerState( + List> results) { + for (Future state : results) { + try { + if (state.get() == EndpointStateMachine.EndPointStates.SHUTDOWN) { + // if any endpoint tells us to shutdown we move to shutdown state. + return DatanodeStateMachine.DatanodeStates.SHUTDOWN; + } + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error in executing end point task.", e); + } + } + return DatanodeStateMachine.DatanodeStates.RUNNING; + } + + /** + * Wait for execute to finish. + * + * @param duration - Time + * @param timeUnit - Unit of duration. + */ + @Override + public DatanodeStateMachine.DatanodeStates + await(long duration, TimeUnit timeUnit) + throws InterruptedException, ExecutionException, TimeoutException { + int count = connectionManager.getValues().size(); + int returned = 0; + long timeLeft = timeUnit.toMillis(duration); + long startTime = Time.monotonicNow(); + List> results = new + LinkedList<>(); + + while (returned < count && timeLeft > 0) { + Future result = + ecs.poll(timeLeft, TimeUnit.MILLISECONDS); + if (result != null) { + results.add(result); + returned++; + } + timeLeft = timeLeft - (Time.monotonicNow() - startTime); + } + return computeNextContainerState(results); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java new file mode 100644 index 00000000000..6b8d16c6d39 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.datanode; +/** + This package contians files that guide the state transitions from + Init->Running->Shutdown for the datanode. + */ \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java new file mode 100644 index 00000000000..4f877ff8d04 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.states.endpoint; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.Callable; + +/** + * Heartbeat class for SCMs. + */ +public class HeartbeatEndpointTask + implements Callable { + static final Logger LOG = + LoggerFactory.getLogger(HeartbeatEndpointTask.class); + private final EndpointStateMachine rpcEndpoint; + private final Configuration conf; + private ContainerNodeIDProto containerNodeIDProto; + + /** + * Constructs a SCM heart beat. + * + * @param conf Config. + */ + public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint, + Configuration conf) { + this.rpcEndpoint = rpcEndpoint; + this.conf = conf; + } + + /** + * Get the container Node ID proto. + * + * @return ContainerNodeIDProto + */ + public ContainerNodeIDProto getContainerNodeIDProto() { + return containerNodeIDProto; + } + + /** + * Set container node ID proto. + * + * @param containerNodeIDProto - the node id. + */ + public void setContainerNodeIDProto(ContainerNodeIDProto + containerNodeIDProto) { + this.containerNodeIDProto = containerNodeIDProto; + } + + /** + * Computes a result, or throws an exception if unable to do so. + * + * @return computed result + * @throws Exception if unable to compute a result + */ + @Override + public EndpointStateMachine.EndPointStates call() throws Exception { + rpcEndpoint.lock(); + try { + Preconditions.checkState(this.containerNodeIDProto != null); + DatanodeID datanodeID = DatanodeID.getFromProtoBuf(this + .containerNodeIDProto.getDatanodeID()); + // TODO : Add the command to command processor queue. + rpcEndpoint.getEndPoint().sendHeartbeat(datanodeID); + rpcEndpoint.zeroMissedCount(); + } catch (IOException ex) { + rpcEndpoint.logIfNeeded(ex + ); + } finally { + rpcEndpoint.unlock(); + } + return rpcEndpoint.getState(); + } + + /** + * Returns a builder class for HeartbeatEndpointTask task. + * @return Builder. + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder class for HeartbeatEndpointTask. + */ + public static class Builder { + private EndpointStateMachine endPointStateMachine; + private Configuration conf; + private ContainerNodeIDProto containerNodeIDProto; + + /** + * Constructs the builder class. + */ + public Builder() { + } + + /** + * Sets the endpoint state machine. + * + * @param rpcEndPoint - Endpoint state machine. + * @return Builder + */ + public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) { + this.endPointStateMachine = rpcEndPoint; + return this; + } + + /** + * Sets the Config. + * + * @param config - config + * @return Builder + */ + public Builder setConfig(Configuration config) { + this.conf = config; + return this; + } + + /** + * Sets the NodeID. + * + * @param nodeID - NodeID proto + * @return Builder + */ + public Builder setNodeID(ContainerNodeIDProto nodeID) { + this.containerNodeIDProto = nodeID; + return this; + } + + public HeartbeatEndpointTask build() { + if (endPointStateMachine == null) { + LOG.error("No endpoint specified."); + throw new IllegalArgumentException("A valid endpoint state machine is" + + " needed to construct HeartbeatEndpointTask task"); + } + + if (conf == null) { + LOG.error("No config specified."); + throw new IllegalArgumentException("A valid configration is needed to" + + " construct HeartbeatEndpointTask task"); + } + + if (containerNodeIDProto == null) { + LOG.error("No nodeID specified."); + throw new IllegalArgumentException("A vaild Node ID is needed to " + + "construct HeartbeatEndpointTask task"); + } + + HeartbeatEndpointTask task = new HeartbeatEndpointTask(this + .endPointStateMachine, this.conf); + task.setContainerNodeIDProto(containerNodeIDProto); + return task; + } + + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java new file mode 100644 index 00000000000..63d19eacf62 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.endpoint; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; + +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +/** + * Register a container with SCM. + */ +public final class RegisterEndpointTask implements + Callable { + static final Logger LOG = LoggerFactory.getLogger(RegisterEndpointTask.class); + + private final EndpointStateMachine rpcEndPoint; + private final Configuration conf; + private Future result; + private ContainerNodeIDProto containerNodeIDProto; + + /** + * Creates a register endpoint task. + * + * @param rpcEndPoint - endpoint + * @param conf - conf + */ + @VisibleForTesting + public RegisterEndpointTask(EndpointStateMachine rpcEndPoint, + Configuration conf) { + this.rpcEndPoint = rpcEndPoint; + this.conf = conf; + + } + + /** + * Get the ContainerNodeID Proto. + * + * @return ContainerNodeIDProto + */ + public ContainerNodeIDProto getContainerNodeIDProto() { + return containerNodeIDProto; + } + + /** + * Set the contiainerNodeID Proto. + * + * @param containerNodeIDProto - Container Node ID. + */ + public void setContainerNodeIDProto(ContainerNodeIDProto + containerNodeIDProto) { + this.containerNodeIDProto = containerNodeIDProto; + } + + /** + * Computes a result, or throws an exception if unable to do so. + * + * @return computed result + * @throws Exception if unable to compute a result + */ + @Override + public EndpointStateMachine.EndPointStates call() throws Exception { + + if (getContainerNodeIDProto() == null) { + LOG.error("Container ID proto cannot be null in RegisterEndpoint task, " + + "shutting down the endpoint."); + return rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN); + } + + rpcEndPoint.lock(); + try { + DatanodeID dnNodeID = DatanodeID.getFromProtoBuf( + getContainerNodeIDProto().getDatanodeID()); + + // TODO : Add responses to the command Queue. + rpcEndPoint.getEndPoint().register(dnNodeID, + conf.getStrings(OzoneConfigKeys.OZONE_SCM_NAMES)); + EndpointStateMachine.EndPointStates nextState = + rpcEndPoint.getState().getNextState(); + rpcEndPoint.setState(nextState); + rpcEndPoint.zeroMissedCount(); + } catch (IOException ex) { + rpcEndPoint.logIfNeeded(ex + ); + } finally { + rpcEndPoint.unlock(); + } + + return rpcEndPoint.getState(); + } + + /** + * Returns a builder class for RegisterEndPoint task. + * + * @return Builder. + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder class for RegisterEndPoint task. + */ + public static class Builder { + private EndpointStateMachine endPointStateMachine; + private Configuration conf; + private ContainerNodeIDProto containerNodeIDProto; + + /** + * Constructs the builder class. + */ + public Builder() { + } + + /** + * Sets the endpoint state machine. + * + * @param rpcEndPoint - Endpoint state machine. + * @return Builder + */ + public Builder setEndpointStateMachine(EndpointStateMachine rpcEndPoint) { + this.endPointStateMachine = rpcEndPoint; + return this; + } + + /** + * Sets the Config. + * + * @param config - config + * @return Builder. + */ + public Builder setConfig(Configuration config) { + this.conf = config; + return this; + } + + /** + * Sets the NodeID. + * + * @param nodeID - NodeID proto + * @return Builder + */ + public Builder setNodeID(ContainerNodeIDProto nodeID) { + this.containerNodeIDProto = nodeID; + return this; + } + + public RegisterEndpointTask build() { + if (endPointStateMachine == null) { + LOG.error("No endpoint specified."); + throw new IllegalArgumentException("A valid endpoint state machine is" + + " needed to construct RegisterEndPoint task"); + } + + if (conf == null) { + LOG.error("No config specified."); + throw new IllegalArgumentException("A valid configration is needed to" + + " construct RegisterEndpoint task"); + } + + if (containerNodeIDProto == null) { + LOG.error("No nodeID specified."); + throw new IllegalArgumentException("A vaild Node ID is needed to " + + "construct RegisterEndpoint task"); + } + + RegisterEndpointTask task = new RegisterEndpointTask(this + .endPointStateMachine, this.conf); + task.setContainerNodeIDProto(containerNodeIDProto); + return task; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java new file mode 100644 index 00000000000..1dfc432e47f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/VersionEndpointTask.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.endpoint; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; +import org.apache.hadoop.ozone.protocol.VersionResponse; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; + +import java.io.IOException; +import java.util.concurrent.Callable; + +/** + * Task that returns version. + */ +public class VersionEndpointTask implements + Callable { + private final EndpointStateMachine rpcEndPoint; + private final Configuration configuration; + + public VersionEndpointTask(EndpointStateMachine rpcEndPoint, + Configuration conf) { + this.rpcEndPoint = rpcEndPoint; + this.configuration = conf; + } + + /** + * Computes a result, or throws an exception if unable to do so. + * + * @return computed result + * @throws Exception if unable to compute a result + */ + @Override + public EndpointStateMachine.EndPointStates call() throws Exception { + rpcEndPoint.lock(); + try{ + SCMVersionResponseProto versionResponse = + rpcEndPoint.getEndPoint().getVersion(); + rpcEndPoint.setVersion(VersionResponse.getFromProtobuf(versionResponse)); + + EndpointStateMachine.EndPointStates nextState = + rpcEndPoint.getState().getNextState(); + rpcEndPoint.setState(nextState); + rpcEndPoint.zeroMissedCount(); + } catch (IOException ex) { + rpcEndPoint.logIfNeeded(ex); + } finally { + rpcEndPoint.unlock(); + } + return rpcEndPoint.getState(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java new file mode 100644 index 00000000000..112259834dd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.states.endpoint; +/** + This package contains code for RPC endpoints transitions. + */ \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java new file mode 100644 index 00000000000..92c953ff410 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.states; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index cf1e9bd2fc9..2565a04c695 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -64,8 +64,8 @@ public class OzoneContainer { Configuration ozoneConfig, FsDatasetSpi dataSet) throws Exception { List locations = new LinkedList<>(); - String[] paths = ozoneConfig.getStrings(OzoneConfigKeys - .OZONE_METADATA_DIRS); + String[] paths = ozoneConfig.getStrings( + OzoneConfigKeys.OZONE_CONTAINER_METADATA_DIRS); if (paths != null && paths.length > 0) { for (String p : paths) { locations.add(StorageLocation.parse(p)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java new file mode 100644 index 00000000000..86ca9465e38 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; + +import java.io.IOException; + +/** + * The protocol spoken between datanodes and SCM. For specifics please the + * Protoc file that defines this protocol. + */ +@InterfaceAudience.Private +public interface StorageContainerDatanodeProtocol { + /** + * Returns SCM version. + * @return Version info. + */ + SCMVersionResponseProto getVersion() throws IOException; + + /** + * Used by data node to send a Heartbeat. + * @param datanodeID - Datanode ID. + * @return - SCMHeartbeatResponseProto + * @throws IOException + */ + SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID) + throws IOException; + + /** + * Register Datanode. + * @param datanodeID - DatanodID. + * @param scmAddresses - List of SCMs this datanode is configured to + * communicate. + * @return SCM Command. + */ + SCMRegisteredCmdResponseProto register(DatanodeID datanodeID, + String[] scmAddresses) throws IOException; + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java index a44bad51260..4bdf422cc15 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.ozone.protocol.commands; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto.Type; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.NullCmdResponseProto; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java index e8cc66094ed..f2944ceb797 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java @@ -18,15 +18,19 @@ package org.apache.hadoop.ozone.protocol.commands; import com.google.common.base.Preconditions; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.RegisteredCmdResponseProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.RegisteredCmdResponseProto.ErrorCode; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto.Type; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto + .ErrorCode; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.Type; /** * Response to Datanode Register call. */ -public class RegisteredCommand extends SCMCommand { - +public class RegisteredCommand extends + SCMCommand { private String datanodeUUID; private String clusterID; private ErrorCode error; @@ -38,8 +42,6 @@ public class RegisteredCommand extends SCMCommand { this.error = error; } - - /** * Returns a new builder. * @@ -56,11 +58,12 @@ public class RegisteredCommand extends SCMCommand { */ @Override Type getType() { - return Type.registeredCmd; + return Type.registeredCommand; } /** * Returns datanode UUID. + * * @return - Datanode ID. */ public String getDatanodeUUID() { @@ -69,6 +72,7 @@ public class RegisteredCommand extends SCMCommand { /** * Returns cluster ID. + * * @return -- ClusterID */ public String getClusterID() { @@ -77,6 +81,7 @@ public class RegisteredCommand extends SCMCommand { /** * Returns ErrorCode. + * * @return - ErrorCode */ public ErrorCode getError() { @@ -89,8 +94,8 @@ public class RegisteredCommand extends SCMCommand { * @return A protobuf message. */ @Override - RegisteredCmdResponseProto getProtoBufMessage() { - return RegisteredCmdResponseProto.newBuilder() + SCMRegisteredCmdResponseProto getProtoBufMessage() { + return SCMRegisteredCmdResponseProto.newBuilder() .setClusterID(this.clusterID) .setDatanodeUUID(this.datanodeUUID) .setErrorCode(this.error) @@ -122,7 +127,7 @@ public class RegisteredCommand extends SCMCommand { * @param response - RegisteredCmdResponseProto * @return RegisteredCommand */ - public RegisteredCommand getFromProtobuf(RegisteredCmdResponseProto + public RegisteredCommand getFromProtobuf(SCMRegisteredCmdResponseProto response) { Preconditions.checkNotNull(response); return new RegisteredCommand(response.getErrorCode(), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java index d0e3e857849..a6acf4eb7e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.protocol.commands; import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto.Type; + .StorageContainerDatanodeProtocolProtos.Type; import com.google.protobuf.GeneratedMessage; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java new file mode 100644 index 00000000000..e71684c3489 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.protocolPB; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ipc.ProtobufHelper; +import org.apache.hadoop.ipc.ProtocolTranslator; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; + +import java.io.Closeable; +import java.io.IOException; + +/** + * This class is the client-side translator to translate the requests made on + * the {@link StorageContainerDatanodeProtocol} interface to the RPC server + * implementing {@link StorageContainerDatanodeProtocolPB}. + */ +public class StorageContainerDatanodeProtocolClientSideTranslatorPB + implements StorageContainerDatanodeProtocol, ProtocolTranslator, Closeable { + + /** + * RpcController is not used and hence is set to null. + */ + private static final RpcController NULL_RPC_CONTROLLER = null; + private final StorageContainerDatanodeProtocolPB rpcProxy; + + /** + * Constructs a Client side interface that calls into SCM datanode protocol. + * + * @param rpcProxy - Proxy for RPC. + */ + public StorageContainerDatanodeProtocolClientSideTranslatorPB( + StorageContainerDatanodeProtocolPB rpcProxy) { + this.rpcProxy = rpcProxy; + } + + /** + * Closes this stream and releases any system resources associated with it. If + * the stream is already closed then invoking this method has no effect. + *

+ *

As noted in {@link AutoCloseable#close()}, cases where the close may + * fail require careful attention. It is strongly advised to relinquish the + * underlying resources and to internally mark the {@code Closeable} + * as closed, prior to throwing the {@code IOException}. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + RPC.stopProxy(rpcProxy); + } + + /** + * Return the proxy object underlying this protocol translator. + * + * @return the proxy object underlying this protocol translator. + */ + @Override + public Object getUnderlyingProxyObject() { + return rpcProxy; + } + + /** + * Returns SCM version. + * + * @return Version info. + */ + @Override + public SCMVersionResponseProto getVersion() throws IOException { + + SCMVersionRequestProto request = + SCMVersionRequestProto.newBuilder().build(); + final SCMVersionResponseProto response; + try { + response = rpcProxy.getVersion(NULL_RPC_CONTROLLER, request); + } catch (ServiceException ex) { + throw ProtobufHelper.getRemoteException(ex); + } + return response; + } + + /** + * Send by datanode to SCM. + * + * @param datanodeID - DatanodeID + * @throws IOException + */ + + @Override + public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID) + throws IOException { + SCMHeartbeatRequestProto.Builder req = + SCMHeartbeatRequestProto.newBuilder(); + req.setDatanodeID(datanodeID.getProtoBufMessage()); + final SCMHeartbeatResponseProto resp; + try { + resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + return resp; + } + + /** + * Register Datanode. + * + * @param datanodeID - DatanodID. + * @return SCM Command. + */ + @Override + public SCMRegisteredCmdResponseProto register(DatanodeID datanodeID, + String[] scmAddresses) throws IOException { + SCMRegisterRequestProto.Builder req = + SCMRegisterRequestProto.newBuilder(); + req.setDatanodeID(datanodeID.getProtoBufMessage()); + final SCMRegisteredCmdResponseProto response; + try { + response = rpcProxy.register(NULL_RPC_CONTROLLER, req.build()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + return response; + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolPB.java new file mode 100644 index 00000000000..3e6f392799d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolPB.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.protocolPB; + +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageContainerDatanodeProtocolService; + +/** + * Protocol used from a datanode to StorageContainerManager. This extends + * the Protocol Buffers service interface to add Hadoop-specific annotations. + */ + +@ProtocolInfo(protocolName = + "org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol", + protocolVersion = 1) +public interface StorageContainerDatanodeProtocolPB extends + StorageContainerDatanodeProtocolService.BlockingInterface { +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java new file mode 100644 index 00000000000..70ad414c8d3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.protocolPB; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; + +import java.io.IOException; + +/** + * This class is the server-side translator that forwards requests received on + * {@link StorageContainerDatanodeProtocolPB} to the {@link + * StorageContainerDatanodeProtocol} server implementation. + */ +public class StorageContainerDatanodeProtocolServerSideTranslatorPB + implements StorageContainerDatanodeProtocolPB { + + private final StorageContainerDatanodeProtocol impl; + + public StorageContainerDatanodeProtocolServerSideTranslatorPB( + StorageContainerDatanodeProtocol impl) { + this.impl = impl; + } + + @Override + public StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto + getVersion(RpcController controller, + StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto request) + throws ServiceException { + try { + return impl.getVersion(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto + register(RpcController controller, StorageContainerDatanodeProtocolProtos + .SCMRegisterRequestProto request) throws ServiceException { + String[] addressArray = null; + + if (request.hasAddressList()) { + addressArray = request.getAddressList().getAddressListList() + .toArray(new String[0]); + } + + try { + return impl.register(DatanodeID.getFromProtoBuf(request + .getDatanodeID()), addressArray); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public SCMHeartbeatResponseProto + sendHeartbeat(RpcController controller, + SCMHeartbeatRequestProto request) throws ServiceException { + try { + return impl.sendHeartbeat(DatanodeID.getFromProtoBuf(request + .getDatanodeID())); + } catch (IOException e) { + throw new ServiceException(e); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java index eb21eb30334..6bb3a22683c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java @@ -26,6 +26,8 @@ public final class VersionInfo { private final static VersionInfo[] VERSION_INFOS = {new VersionInfo("First version of SCM", 1)}; + + public static final String DESCRIPTION_KEY = "Description"; private final String description; private final int version; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java index abf7d52f2c5..3fe70840eb4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java @@ -29,8 +29,11 @@ import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; import org.apache.hadoop.ozone.protocol.VersionResponse; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.RegisteredCmdResponseProto.ErrorCode; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto + .ErrorCode; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.ozone.scm.VersionInfo; import org.apache.hadoop.util.concurrent.HadoopExecutors; @@ -43,7 +46,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ScheduledExecutorService; @@ -580,21 +582,20 @@ public class SCMNodeManager @Override public SCMCommand register(DatanodeID datanodeID) { - SCMCommand errorCode = verifyDatanodeUUID(datanodeID); - if (errorCode != null) { - return errorCode; + SCMCommand responseCommand = verifyDatanodeUUID(datanodeID); + if (responseCommand != null) { + return responseCommand; } - DatanodeID newDatanodeID = new DatanodeID(UUID.randomUUID().toString(), - datanodeID); - nodes.put(newDatanodeID.getDatanodeUuid(), newDatanodeID); + + nodes.put(datanodeID.getDatanodeUuid(), datanodeID); totalNodes.incrementAndGet(); - healthyNodes.put(newDatanodeID.getDatanodeUuid(), monotonicNow()); + healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow()); healthyNodeCount.incrementAndGet(); LOG.info("Data node with ID: {} Registered.", - newDatanodeID.getDatanodeUuid()); + datanodeID.getDatanodeUuid()); return RegisteredCommand.newBuilder() .setErrorCode(ErrorCode.success) - .setDatanodeUUID(newDatanodeID.getDatanodeUuid()) + .setDatanodeUUID(datanodeID.getDatanodeUuid()) .setClusterID(this.clusterID) .build(); } @@ -607,20 +608,12 @@ public class SCMNodeManager */ private SCMCommand verifyDatanodeUUID(DatanodeID datanodeID) { - // Make sure that we return the right error code, so that - // data node can log the correct error. if it is already registered then - // datanode should move to heartbeat state. It implies that somehow we - // have an error where the data node is trying to re-register. - // - // We are going to let the datanode know that there is an error but allow it - // to recover by sending it the right info that is needed for recovery. - if (datanodeID.getDatanodeUuid() != null && nodes.containsKey(datanodeID.getDatanodeUuid())) { - LOG.error("Datanode is already registered. Datanode: {}", + LOG.trace("Datanode is already registered. Datanode: {}", datanodeID.toString()); return RegisteredCommand.newBuilder() - .setErrorCode(ErrorCode.errorNodeAlreadyRegistered) + .setErrorCode(ErrorCode.success) .setClusterID(this.clusterID) .setDatanodeUUID(datanodeID.getDatanodeUuid()) .build(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto index 78e7e743c15..5dea5cc684d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -31,9 +31,13 @@ option java_generic_services = true; option java_generate_equals_and_hash = true; package hadoop.hdfs; + import "hdfs.proto"; + import "HdfsServer.proto"; + import "DatanodeProtocol.proto"; + import "DatanodeContainerProtocol.proto"; @@ -45,6 +49,10 @@ message SCMHeartbeatRequestProto { required DatanodeIDProto datanodeID = 1; } +message SCMRegisterRequestProto { + required DatanodeIDProto datanodeID = 1; + optional SCMNodeAddressList addressList = 2; +} /** * Request for version info of the software stack on the server. @@ -59,24 +67,38 @@ message SCMVersionRequestProto { */ message SCMVersionResponseProto { required uint32 softwareVersion = 1; - repeated hadoop.hdfs.ozone.KeyValue keys = 2; + repeated hadoop.hdfs.ozone.KeyValue keys = 2; +} + +message SCMNodeAddressList { + repeated string addressList = 1; } /** * Datanode ID returned by the SCM. This is similar to name node * registeration of a datanode. */ -message RegisteredCmdResponseProto { +message SCMRegisteredCmdResponseProto { enum ErrorCode { success = 1; - errorNodeAlreadyRegistered = 2; - errorNodeNotPermitted = 3; + errorNodeNotPermitted = 2; } - required ErrorCode errorCode = 1; - optional string datanodeUUID = 2; - optional string clusterID = 3; + required ErrorCode errorCode = 2; + optional string datanodeUUID = 3; + optional string clusterID = 4; + optional SCMNodeAddressList addressList = 5; } +/** + * Container ID maintains the container's Identity along with cluster ID + * after the registration is done. + */ +message ContainerNodeIDProto { + required DatanodeIDProto datanodeID = 1; + optional string clusterID = 2; +} + + /** * Empty Command Response */ @@ -84,18 +106,21 @@ message NullCmdResponseProto { } +/** +Type of commands supported by SCM to datanode protocol. +*/ +enum Type { + nullCmd = 1; + versionCommand = 2; + registeredCommand = 3; +} + /* * These are commands returned by SCM for to the datanode to execute. */ -message SCMCommandResponseProto { - enum Type { - nullCmd = 1; - registeredCmd = 2; // Returns the datanode ID after registeration. - } - - required Type cmdType = 1; // Type of the command - optional NullCmdResponseProto nullCommand = 2; - optional RegisteredCmdResponseProto registerNode = 3; +message SCMCommandResponseProto { + required Type cmdType = 2; // Type of the command + optional NullCmdResponseProto nullCommand = 3; } @@ -160,12 +185,11 @@ message SCMHeartbeatResponseProto { * registered with some SCM. If this file is not found, datanode assumes that * it needs to do a registration. * - * If registration is need datanode moves into REGISTERING_NODE state. It will - * send a register call with datanodeID data structure, but with datanode UUID - * will be set to an empty string. + * If registration is need datanode moves into REGISTER state. It will + * send a register call with datanodeID data structure and presist that info. * - * The response to the command contains the datanode UUID and clusterID. This - * information is persisted by the datanode and moves into heartbeat state. + * The response to the command contains clusterID. This information is + * also persisted by the datanode and moves into heartbeat state. * * Once in the heartbeat state, datanode sends heartbeats and container reports * to SCM and process commands issued by SCM until it is shutdown. @@ -176,12 +200,12 @@ service StorageContainerDatanodeProtocolService { /** * Gets the version information from the SCM. */ - rpc getVersion(SCMVersionRequestProto) returns (SCMVersionResponseProto); + rpc getVersion (SCMVersionRequestProto) returns (SCMVersionResponseProto); /** * Registers a data node with SCM. */ - rpc register(SCMHeartbeatRequestProto) returns (SCMCommandResponseProto); + rpc register (SCMRegisterRequestProto) returns (SCMRegisteredCmdResponseProto); /** * Send heartbeat from datanode to SCM. HB's under SCM looks more diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java new file mode 100644 index 00000000000..c09693b0927 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common; + +import com.google.protobuf.BlockingService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; +import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; +import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageContainerDatanodeProtocolService; +import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; +import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.ozone.scm.node.SCMNodeManager; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.util.Random; +import java.util.UUID; + +/** + * Test Endpoint class. + */ +public final class SCMTestUtils { + /** + * Never constructed. + */ + private SCMTestUtils() { + } + + /** + * Starts an RPC server, if configured. + * + * @param conf configuration + * @param addr configured address of RPC server + * @param protocol RPC protocol provided by RPC server + * @param instance RPC protocol implementation instance + * @param handlerCount RPC server handler count + * @return RPC server + * @throws IOException if there is an I/O error while creating RPC server + */ + private static RPC.Server startRpcServer(Configuration conf, + InetSocketAddress addr, Class + protocol, BlockingService instance, int handlerCount) + throws IOException { + RPC.Server rpcServer = new RPC.Builder(conf) + .setProtocol(protocol) + .setInstance(instance) + .setBindAddress(addr.getHostString()) + .setPort(addr.getPort()) + .setNumHandlers(handlerCount) + .setVerbose(false) + .setSecretManager(null) + .build(); + + DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer); + return rpcServer; + } + + /** + * Creates an Endpoint class for testing purpose. + * + * @param conf - Conf + * @param address - InetAddres + * @param rpcTimeout - rpcTimeOut + * @return EndPoint + * @throws Exception + */ + public static EndpointStateMachine createEndpoint(Configuration conf, + InetSocketAddress address, int rpcTimeout) throws Exception { + RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class, + ProtobufRpcEngine.class); + long version = + RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class); + + StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy( + StorageContainerDatanodeProtocolPB.class, version, + address, UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), rpcTimeout, + RetryPolicies.TRY_ONCE_THEN_FAIL).getProxy(); + + StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient = + new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy); + return new EndpointStateMachine(address, rpcClient, conf); + } + + /** + * Start Datanode RPC server. + */ + public static RPC.Server startScmRpcServer(Configuration configuration, + StorageContainerDatanodeProtocol server, + InetSocketAddress rpcServerAddresss, int handlerCount) throws + IOException { + RPC.setProtocolEngine(configuration, + StorageContainerDatanodeProtocolPB.class, + ProtobufRpcEngine.class); + + BlockingService scmDatanodeService = + StorageContainerDatanodeProtocolService. + newReflectiveBlockingService( + new StorageContainerDatanodeProtocolServerSideTranslatorPB( + server)); + + RPC.Server scmServer = startRpcServer(configuration, rpcServerAddresss, + StorageContainerDatanodeProtocolPB.class, scmDatanodeService, + handlerCount); + + scmServer.start(); + return scmServer; + } + + public static InetSocketAddress getReuseableAddress() throws IOException { + try (ServerSocket socket = new ServerSocket(0)) { + socket.setReuseAddress(true); + int port = socket.getLocalPort(); + String addr = InetAddress.getLoopbackAddress().getHostAddress() + .toString(); + return new InetSocketAddress(addr, port); + } + } + + public static Configuration getConf() { + return new Configuration(); + } + + public static DatanodeID getDatanodeID(SCMNodeManager nodeManager) { + + return getDatanodeID(nodeManager, UUID.randomUUID().toString()); + } + + /** + * Create a new DatanodeID with NodeID set to the string. + * + * @param uuid - node ID, it is generally UUID. + * @return DatanodeID. + */ + public static DatanodeID getDatanodeID(SCMNodeManager nodeManager, String + uuid) { + DatanodeID tempDataNode = getDatanodeID(uuid); + RegisteredCommand command = + (RegisteredCommand) nodeManager.register(tempDataNode); + return new DatanodeID(command.getDatanodeUUID(), tempDataNode); + } + + /** + * Get a datanode ID. + * + * @return DatanodeID + */ + public static DatanodeID getDatanodeID() { + return getDatanodeID(UUID.randomUUID().toString()); + } + + private static DatanodeID getDatanodeID(String uuid) { + Random random = new Random(); + String ipAddress = random.nextInt(256) + "." + + random.nextInt(256) + "." + + random.nextInt(256) + "." + + random.nextInt(256); + + String hostName = uuid; + return new DatanodeID(ipAddress, + hostName, uuid, 0, 0, 0, 0); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java new file mode 100644 index 00000000000..ad805a76d77 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common; + +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; +import org.apache.hadoop.ozone.protocol.VersionResponse; +import org.apache.hadoop.ozone.protocol.commands.NullCommand; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.ozone.scm.VersionInfo; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * SCM RPC mock class. + */ +public class ScmTestMock implements StorageContainerDatanodeProtocol { + private int rpcResponseDelay; + private AtomicInteger heartbeatCount = new AtomicInteger(0); + private AtomicInteger rpcCount = new AtomicInteger(0); + + /** + * Returns the number of heartbeats made to this class. + * + * @return int + */ + public int getHeartbeatCount() { + return heartbeatCount.get(); + } + + /** + * Returns the number of RPC calls made to this mock class instance. + * + * @return - Number of RPC calls serviced by this class. + */ + public int getRpcCount() { + return rpcCount.get(); + } + + /** + * Gets the RPC response delay. + * + * @return delay in milliseconds. + */ + public int getRpcResponseDelay() { + return rpcResponseDelay; + } + + /** + * Sets the RPC response delay. + * + * @param rpcResponseDelay - delay in milliseconds. + */ + public void setRpcResponseDelay(int rpcResponseDelay) { + this.rpcResponseDelay = rpcResponseDelay; + } + + /** + * Returns SCM version. + * + * @return Version info. + */ + @Override + public StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto + getVersion() throws IOException { + rpcCount.incrementAndGet(); + sleepIfNeeded(); + VersionInfo versionInfo = VersionInfo.getLatestVersion(); + return VersionResponse.newBuilder() + .setVersion(versionInfo.getVersion()) + .addValue(VersionInfo.DESCRIPTION_KEY, versionInfo.getDescription()) + .build().getProtobufMessage(); + } + + private void sleepIfNeeded() { + if (getRpcResponseDelay() > 0) { + try { + Thread.sleep(getRpcResponseDelay()); + } catch (InterruptedException ex) { + // Just ignore this exception. + } + } + } + + /** + * Used by data node to send a Heartbeat. + * + * @param datanodeID - Datanode ID. + * @return - SCMHeartbeatResponseProto + * @throws IOException + */ + @Override + public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto + sendHeartbeat(DatanodeID datanodeID) + throws IOException { + rpcCount.incrementAndGet(); + heartbeatCount.incrementAndGet(); + sleepIfNeeded(); + StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto + cmdResponse = StorageContainerDatanodeProtocolProtos + .SCMCommandResponseProto + .newBuilder().setCmdType(StorageContainerDatanodeProtocolProtos + .Type.nullCmd) + .setNullCommand( + NullCommand.newBuilder().build().getProtoBufMessage()).build(); + return StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto + .newBuilder() + .addCommands(cmdResponse).build(); + } + + /** + * Register Datanode. + * + * @param datanodeID - DatanodID. + * @param scmAddresses - List of SCMs this datanode is configured to + * communicate. + * @return SCM Command. + */ + @Override + public StorageContainerDatanodeProtocolProtos + .SCMRegisteredCmdResponseProto register(DatanodeID datanodeID, + String[] scmAddresses) throws IOException { + rpcCount.incrementAndGet(); + sleepIfNeeded(); + return StorageContainerDatanodeProtocolProtos + .SCMRegisteredCmdResponseProto + .newBuilder().setClusterID(UUID.randomUUID().toString()) + .setDatanodeUUID(datanodeID.getDatanodeUuid()).setErrorCode( + StorageContainerDatanodeProtocolProtos + .SCMRegisteredCmdResponseProto.ErrorCode.success).build(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java new file mode 100644 index 00000000000..b75a925e80b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.container.common.statemachine + .DatanodeStateMachine; + +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine + .SCMConnectionManager; + +import org.apache.hadoop.ozone.container.common.states.DatanodeState; +import org.apache.hadoop.ozone.container.common.states.datanode + .InitDatanodeState; +import org.apache.hadoop.ozone.container.common.states.datanode + .RunningDatanodeState; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URL; +import java.nio.file.Paths; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Tests the datanode state machine class and its states. + */ +public class TestDatanodeStateMachine { + private final int scmServerCount = 3; + private List serverAddresses; + private List scmServers; + private List mockServers; + private ExecutorService executorService; + private Configuration conf; + private static final Logger LOG = + LoggerFactory.getLogger(TestDatanodeStateMachine.class); + + @Before + public void setUp() throws Exception { + conf = SCMTestUtils.getConf(); + serverAddresses = new LinkedList<>(); + scmServers = new LinkedList<>(); + mockServers = new LinkedList<>(); + for (int x = 0; x < scmServerCount; x++) { + int port = SCMTestUtils.getReuseableAddress().getPort(); + String address = "127.0.0.1"; + serverAddresses.add(address + ":" + port); + ScmTestMock mock = new ScmTestMock(); + + scmServers.add(SCMTestUtils.startScmRpcServer(conf, mock, + new InetSocketAddress(address, port), 10)); + mockServers.add(mock); + } + + conf.setStrings(OzoneConfigKeys.OZONE_SCM_NAMES, + serverAddresses.toArray(new String[0])); + + URL p = this.getClass().getResource(""); + String path = p.getPath().concat( + TestDatanodeStateMachine.class.getSimpleName()); + File f = new File(path); + if(!f.mkdirs()) { + LOG.info("Required directories already exist."); + } + + path = Paths.get(path.toString(), + TestDatanodeStateMachine.class.getSimpleName() + ".id").toString(); + conf.set(OzoneConfigKeys.OZONE_SCM_DATANODE_ID, path); + + executorService = HadoopExecutors.newScheduledThreadPool( + conf.getInt( + OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS, + OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS_DEFAULT), + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Test Data Node State Machine Thread - %d").build()); + } + + @After + public void tearDown() throws Exception { + try { + executorService.shutdownNow(); + for (RPC.Server s : scmServers) { + s.stop(); + } + } catch (Exception e) { + //ignore all execption from the shutdown + } + } + + /** + * Assert that starting statemachine executes the Init State. + * + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testDatanodeStateMachineStartThread() throws IOException, + InterruptedException, TimeoutException { + final DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf); + Runnable startStateMachineTask = () -> { + try { + stateMachine.start(); + } catch (IOException ex) { + } + }; + Thread thread1 = new Thread(startStateMachineTask); + thread1.setDaemon(true); + thread1.start(); + + SCMConnectionManager connectionManager = + stateMachine.getConnectionManager(); + + GenericTestUtils.waitFor(() -> connectionManager.getValues().size() == 3 , + 100, 1000); + + stateMachine.close(); + } + + /** + * This test explores the state machine by invoking each call in sequence just + * like as if the state machine would call it. Because this is a test we are + * able to verify each of the assumptions. + *

+ * Here is what happens at High level. + *

+ * 1. We start the datanodeStateMachine in the INIT State. + *

+ * 2. We invoke the INIT state task. + *

+ * 3. That creates a set of RPC endpoints that are ready to connect to SCMs. + *

+ * 4. We assert that we have moved to the running state for the + * DatanodeStateMachine. + *

+ * 5. We get the task for the Running State - Executing that running state, + * makes the first network call in of the state machine. The Endpoint is in + * the GETVERSION State and we invoke the task. + *

+ * 6. We assert that this call was a success by checking that each of the + * endponts now have version response that it got from the SCM server that it + * was talking to and also each of the mock server serviced one RPC call. + *

+ * 7. Since the Register is done now, next calls to get task will return + * HeartbeatTask, which sends heartbeats to SCM. We assert that we get right + * task from sub-system below. + * + * @throws IOException + */ + @Test + public void testDatanodeStateContext() throws IOException, + InterruptedException, ExecutionException, TimeoutException { + final DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf); + DatanodeStateMachine.DatanodeStates currentState = + stateMachine.getContext().getState(); + Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT, + currentState); + + DatanodeState task = + stateMachine.getContext().getTask(); + Assert.assertEquals(InitDatanodeState.class, task.getClass()); + + task.execute(executorService); + DatanodeStateMachine.DatanodeStates newState = + task.await(2, TimeUnit.SECONDS); + + for (EndpointStateMachine endpoint : + stateMachine.getConnectionManager().getValues()) { + // We assert that each of the is in State GETVERSION. + Assert.assertEquals(EndpointStateMachine.EndPointStates.GETVERSION, + endpoint.getState()); + } + + // The Datanode has moved into Running State, since endpoints are created. + // We move to running state when we are ready to issue RPC calls to SCMs. + Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING, + newState); + + // If we had called context.execute instead of calling into each state + // this would have happened automatically. + stateMachine.getContext().setState(newState); + task = stateMachine.getContext().getTask(); + Assert.assertEquals(RunningDatanodeState.class, task.getClass()); + + // This execute will invoke getVersion calls against all SCM endpoints + // that we know of. + task.execute(executorService); + newState = task.await(2, TimeUnit.SECONDS); + + // If we are in running state, we should be in running. + Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING, + newState); + + for (EndpointStateMachine endpoint : + stateMachine.getConnectionManager().getValues()) { + + // Since the earlier task.execute called into GetVersion, the + // endPointState Machine should move to REGISTER state. + Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER, + endpoint.getState()); + + // We assert that each of the end points have gotten a version from the + // SCM Server. + Assert.assertNotNull(endpoint.getVersion()); + } + + // We can also assert that all mock servers have received only one RPC + // call at this point of time. + for (ScmTestMock mock : mockServers) { + Assert.assertEquals(1, mock.getRpcCount()); + } + + // This task is the Running task, but running task executes tasks based + // on the state of Endpoints, hence this next call will be a Register at + // the endpoint RPC level. + task = stateMachine.getContext().getTask(); + task.execute(executorService); + newState = task.await(2, TimeUnit.SECONDS); + + // If we are in running state, we should be in running. + Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING, + newState); + + for (ScmTestMock mock : mockServers) { + Assert.assertEquals(2, mock.getRpcCount()); + } + + // This task is the Running task, but running task executes tasks based + // on the state of Endpoints, hence this next call will be a + // HeartbeatTask at the endpoint RPC level. + task = stateMachine.getContext().getTask(); + task.execute(executorService); + newState = task.await(2, TimeUnit.SECONDS); + + // If we are in running state, we should be in running. + Assert.assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING, + newState); + + for (ScmTestMock mock : mockServers) { + Assert.assertEquals(1, mock.getHeartbeatCount()); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java new file mode 100644 index 00000000000..45de6d9a4d9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.states.endpoint + .HeartbeatEndpointTask; +import org.apache.hadoop.ozone.container.common.states.endpoint + .RegisterEndpointTask; +import org.apache.hadoop.ozone.container.common.states.endpoint + .VersionEndpointTask; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.Type; +import org.apache.hadoop.ozone.scm.VersionInfo; +import org.apache.hadoop.util.Time; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.internal.matchers.LessOrEqual; + +import java.net.InetSocketAddress; +import java.util.UUID; + +/** + * Tests the endpoints. + */ +public class TestEndPoint { + private static InetSocketAddress serverAddress; + private static RPC.Server scmServer; + private static ScmTestMock scmServerImpl; + + @Test + /** + * This test asserts that we are able to make a version call to SCM server + * and gets back the expected values. + */ + public void testGetVersion() throws Exception { + try (EndpointStateMachine rpcEndPoint = + SCMTestUtils.createEndpoint(SCMTestUtils.getConf(), + serverAddress, 1000)) { + SCMVersionResponseProto responseProto = rpcEndPoint.getEndPoint() + .getVersion(); + Assert.assertNotNull(responseProto); + Assert.assertEquals(responseProto.getKeys(0).getKey(), + VersionInfo.DESCRIPTION_KEY); + Assert.assertEquals(responseProto.getKeys(0).getValue(), + VersionInfo.getLatestVersion().getDescription()); + } + } + + @Test + /** + * We make getVersion RPC call, but via the VersionEndpointTask which is + * how the state machine would make the call. + */ + public void testGetVersionTask() throws Exception { + Configuration conf = SCMTestUtils.getConf(); + try (EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(conf, + serverAddress, 1000)) { + rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION); + VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint, + conf); + EndpointStateMachine.EndPointStates newState = versionTask.call(); + + // if version call worked the endpoint should automatically move to the + // next state. + Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER, + newState); + + // Now rpcEndpoint should remember the version it got from SCM + Assert.assertNotNull(rpcEndPoint.getVersion()); + } + } + + @Test + /** + * This test makes a call to end point where there is no SCM server. We + * expect that versionTask should be able to handle it. + */ + public void testGetVersionToInvalidEndpoint() throws Exception { + Configuration conf = SCMTestUtils.getConf(); + InetSocketAddress nonExistentServerAddress = SCMTestUtils + .getReuseableAddress(); + try (EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(conf, + nonExistentServerAddress, 1000)) { + rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION); + VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint, + conf); + EndpointStateMachine.EndPointStates newState = versionTask.call(); + + // This version call did NOT work, so endpoint should remain in the same + // state. + Assert.assertEquals(EndpointStateMachine.EndPointStates.GETVERSION, + newState); + } + } + + @Test + /** + * This test makes a getVersionRPC call, but the DummyStorageServer is + * going to respond little slowly. We will assert that we are still in the + * GETVERSION state after the timeout. + */ + public void testGetVersionAssertRpcTimeOut() throws Exception { + final long rpcTimeout = 1000; + final long tolerance = 100; + Configuration conf = SCMTestUtils.getConf(); + + try (EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(conf, + serverAddress, (int) rpcTimeout)) { + rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION); + VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint, + conf); + + scmServerImpl.setRpcResponseDelay(1500); + long start = Time.monotonicNow(); + EndpointStateMachine.EndPointStates newState = versionTask.call(); + long end = Time.monotonicNow(); + scmServerImpl.setRpcResponseDelay(0); + Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance)); + Assert.assertEquals(EndpointStateMachine.EndPointStates.GETVERSION, + newState); + } + } + + @Test + public void testRegister() throws Exception { + String[] scmAddressArray = new String[1]; + scmAddressArray[0] = serverAddress.toString(); + DatanodeID nodeToRegister = SCMTestUtils.getDatanodeID(); + try (EndpointStateMachine rpcEndPoint = + SCMTestUtils.createEndpoint( + SCMTestUtils.getConf(), serverAddress, 1000)) { + SCMRegisteredCmdResponseProto responseProto = rpcEndPoint.getEndPoint() + .register(nodeToRegister, scmAddressArray); + Assert.assertNotNull(responseProto); + Assert.assertEquals(responseProto.getDatanodeUUID(), + nodeToRegister.getDatanodeUuid()); + Assert.assertNotNull(responseProto.getClusterID()); + } + } + + private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress, + int rpcTimeout, boolean clearContainerID) throws Exception { + EndpointStateMachine rpcEndPoint = + SCMTestUtils.createEndpoint(SCMTestUtils.getConf(), + scmAddress, rpcTimeout); + rpcEndPoint.setState(EndpointStateMachine.EndPointStates.REGISTER); + RegisterEndpointTask endpointTask = + new RegisterEndpointTask(rpcEndPoint, SCMTestUtils.getConf()); + if (!clearContainerID) { + ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder() + .setClusterID(UUID.randomUUID().toString()) + .setDatanodeID(SCMTestUtils.getDatanodeID().getProtoBufMessage()) + .build(); + endpointTask.setContainerNodeIDProto(containerNodeID); + } + endpointTask.call(); + return rpcEndPoint; + } + + @Test + public void testRegisterTask() throws Exception { + try (EndpointStateMachine rpcEndpoint = + registerTaskHelper(serverAddress, 1000, false)) { + // Successful register should move us to Heartbeat state. + Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT, + rpcEndpoint.getState()); + } + } + + @Test + public void testRegisterToInvalidEndpoint() throws Exception { + InetSocketAddress address = SCMTestUtils.getReuseableAddress(); + try (EndpointStateMachine rpcEndpoint = + registerTaskHelper(address, 1000, false)) { + Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER, + rpcEndpoint.getState()); + } + } + + @Test + public void testRegisterNoContainerID() throws Exception { + InetSocketAddress address = SCMTestUtils.getReuseableAddress(); + try (EndpointStateMachine rpcEndpoint = + registerTaskHelper(address, 1000, true)) { + // No Container ID, therefore we tell the datanode that we would like to + // shutdown. + Assert.assertEquals(EndpointStateMachine.EndPointStates.SHUTDOWN, + rpcEndpoint.getState()); + } + } + + @Test + public void testRegisterRpcTimeout() throws Exception { + final long rpcTimeout = 1000; + final long tolerance = 200; + scmServerImpl.setRpcResponseDelay(1500); + long start = Time.monotonicNow(); + registerTaskHelper(serverAddress, 1000, false).close(); + long end = Time.monotonicNow(); + scmServerImpl.setRpcResponseDelay(0); + Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance)); + } + + @Test + public void testHeartbeat() throws Exception { + DatanodeID dataNode = SCMTestUtils.getDatanodeID(); + try (EndpointStateMachine rpcEndPoint = + SCMTestUtils.createEndpoint(SCMTestUtils.getConf(), + serverAddress, 1000)) { + SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint() + .sendHeartbeat(dataNode); + Assert.assertNotNull(responseProto); + Assert.assertEquals(1, responseProto.getCommandsCount()); + Assert.assertNotNull(responseProto.getCommandsList().get(0)); + Assert.assertEquals(responseProto.getCommandsList().get(0).getCmdType(), + Type.nullCmd); + } + } + + private EndpointStateMachine heartbeatTaskHelper(InetSocketAddress scmAddress, + int rpcTimeout) throws Exception { + EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint( + SCMTestUtils.getConf(), + scmAddress, rpcTimeout); + ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder() + .setClusterID(UUID.randomUUID().toString()) + .setDatanodeID(SCMTestUtils.getDatanodeID().getProtoBufMessage()) + .build(); + rpcEndPoint.setState(EndpointStateMachine.EndPointStates.HEARTBEAT); + HeartbeatEndpointTask endpointTask = + new HeartbeatEndpointTask(rpcEndPoint, SCMTestUtils.getConf()); + endpointTask.setContainerNodeIDProto(containerNodeID); + endpointTask.call(); + Assert.assertNotNull(endpointTask.getContainerNodeIDProto()); + return rpcEndPoint; + } + + private void heartbeatTaskHelper(InetSocketAddress address) + throws Exception { + try (EndpointStateMachine rpcEndpoint = + heartbeatTaskHelper(address, 1000)) { + Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT, + rpcEndpoint.getState()); + } + } + + @Test + public void testHeartbeatTask() throws Exception { + heartbeatTaskHelper(serverAddress); + } + + @Test + public void testHeartbeatTaskToInvalidNode() throws Exception { + InetSocketAddress invalidAddress = SCMTestUtils.getReuseableAddress(); + heartbeatTaskHelper(invalidAddress); + } + + @Test + public void testHeartbeatTaskRpcTimeOut() throws Exception { + final long rpcTimeout = 1000; + final long tolerance = 200; + scmServerImpl.setRpcResponseDelay(1500); + long start = Time.monotonicNow(); + InetSocketAddress invalidAddress = SCMTestUtils.getReuseableAddress(); + heartbeatTaskHelper(invalidAddress); + long end = Time.monotonicNow(); + scmServerImpl.setRpcResponseDelay(0); + Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance)); + } + + @AfterClass + public static void tearDown() throws Exception { + if (scmServer != null) { + scmServer.stop(); + } + } + + @BeforeClass + public static void setUp() throws Exception { + serverAddress = SCMTestUtils.getReuseableAddress(); + scmServerImpl = new ScmTestMock(); + scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(), + scmServerImpl, serverAddress, 10); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java index 99287211981..0f028716ac7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java @@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfiguration; -import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.test.GenericTestUtils; import org.hamcrest.CoreMatchers; import org.junit.Assert; @@ -33,7 +33,6 @@ import org.junit.rules.ExpectedException; import java.io.IOException; import java.util.LinkedList; import java.util.List; -import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeoutException; @@ -72,37 +71,6 @@ public class TestNodeManager { return new OzoneConfiguration(); } - /** - * Create a new datanode ID. - * - * @return DatanodeID - */ - DatanodeID getDatanodeID(SCMNodeManager nodeManager) { - - return getDatanodeID(nodeManager, UUID.randomUUID().toString()); - } - - /** - * Create a new DatanodeID with NodeID set to the string. - * - * @param uuid - node ID, it is generally UUID. - * @return DatanodeID. - */ - DatanodeID getDatanodeID(SCMNodeManager nodeManager, String uuid) { - Random random = new Random(); - String ipAddress = random.nextInt(256) + "." - + random.nextInt(256) + "." - + random.nextInt(256) + "." - + random.nextInt(256); - - String hostName = uuid; - DatanodeID tempDataNode = new DatanodeID(ipAddress, - hostName, uuid, 0, 0, 0, 0); - RegisteredCommand command = - (RegisteredCommand) nodeManager.register(tempDataNode); - return new DatanodeID(command.getDatanodeUUID(), tempDataNode); - } - /** * Creates a NodeManager. * @@ -134,7 +102,7 @@ public class TestNodeManager { try (SCMNodeManager nodeManager = createNodeManager(getConf())) { // Send some heartbeats from different nodes. for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) { - DatanodeID datanodeID = getDatanodeID(nodeManager); + DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager); nodeManager.sendHeartbeat(datanodeID); } @@ -181,7 +149,7 @@ public class TestNodeManager { // Need 100 nodes to come out of chill mode, only one node is sending HB. nodeManager.setMinimumChillModeNodes(100); - nodeManager.sendHeartbeat(getDatanodeID(nodeManager)); + nodeManager.sendHeartbeat(SCMTestUtils.getDatanodeID(nodeManager)); GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100, 4 * 1000); assertFalse("Not enough heartbeat, Node manager should have been in " + @@ -203,7 +171,7 @@ public class TestNodeManager { try (SCMNodeManager nodeManager = createNodeManager(getConf())) { nodeManager.setMinimumChillModeNodes(3); - DatanodeID datanodeID = getDatanodeID(nodeManager); + DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager); // Send 10 heartbeat from same node, and assert we never leave chill mode. for (int x = 0; x < 10; x++) { @@ -232,7 +200,7 @@ public class TestNodeManager { Configuration conf = getConf(); conf.setInt(OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100); SCMNodeManager nodeManager = createNodeManager(conf); - DatanodeID datanodeID = getDatanodeID(nodeManager); + DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager); nodeManager.close(); // These should never be processed. @@ -262,7 +230,7 @@ public class TestNodeManager { try (SCMNodeManager nodeManager = createNodeManager(conf)) { for (int x = 0; x < count; x++) { - DatanodeID datanodeID = getDatanodeID(nodeManager); + DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager); nodeManager.sendHeartbeat(datanodeID); } GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100, @@ -346,7 +314,7 @@ public class TestNodeManager { try (SCMNodeManager nodeManager = createNodeManager(conf)) { List nodeList = createNodeSet(nodeManager, nodeCount, "staleNode"); - DatanodeID staleNode = getDatanodeID(nodeManager); + DatanodeID staleNode = SCMTestUtils.getDatanodeID(nodeManager); // Heartbeat once nodeManager.sendHeartbeat(staleNode); @@ -396,7 +364,7 @@ public class TestNodeManager { List nodeList = createNodeSet(nodeManager, nodeCount, "Node"); - DatanodeID deadNode = getDatanodeID(nodeManager); + DatanodeID deadNode = SCMTestUtils.getDatanodeID(nodeManager); // Heartbeat once nodeManager.sendHeartbeat(deadNode); @@ -427,28 +395,6 @@ public class TestNodeManager { } } - /** - * Asserts that if we get duplicate registration calls for a datanode, we will - * ignore it and LOG the error. - * - * @throws IOException - * @throws InterruptedException - * @throws TimeoutException - */ - @Test - public void testScmDuplicateRegistrationLogsError() throws IOException, - InterruptedException, TimeoutException { - try (SCMNodeManager nodeManager = createNodeManager(getConf())) { - GenericTestUtils.LogCapturer logCapturer = - GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG); - DatanodeID duplicateNodeID = getDatanodeID(nodeManager); - nodeManager.register(duplicateNodeID); - logCapturer.stopCapturing(); - assertThat(logCapturer.getOutput(), containsString("Datanode is already" + - " registered.")); - } - } - /** * Asserts that we log an error for null in datanode ID. * @@ -532,9 +478,12 @@ public class TestNodeManager { * Cluster state: Healthy: All nodes are heartbeat-ing like normal. */ try (SCMNodeManager nodeManager = createNodeManager(conf)) { - DatanodeID healthyNode = getDatanodeID(nodeManager, "HealthyNode"); - DatanodeID staleNode = getDatanodeID(nodeManager, "StaleNode"); - DatanodeID deadNode = getDatanodeID(nodeManager, "DeadNode"); + DatanodeID healthyNode = + SCMTestUtils.getDatanodeID(nodeManager, "HealthyNode"); + DatanodeID staleNode = + SCMTestUtils.getDatanodeID(nodeManager, "StaleNode"); + DatanodeID deadNode = + SCMTestUtils.getDatanodeID(nodeManager, "DeadNode"); nodeManager.sendHeartbeat(healthyNode); nodeManager.sendHeartbeat(staleNode); nodeManager.sendHeartbeat(deadNode); @@ -659,7 +608,7 @@ public class TestNodeManager { prefix) { List list = new LinkedList<>(); for (int x = 0; x < count; x++) { - list.add(getDatanodeID(nodeManager, prefix + x)); + list.add(SCMTestUtils.getDatanodeID(nodeManager, prefix + x)); } return list; } @@ -878,7 +827,7 @@ public class TestNodeManager { try (SCMNodeManager nodeManager = createNodeManager(conf)) { nodeManager.setMinimumChillModeNodes(10); - DatanodeID datanodeID = getDatanodeID(nodeManager); + DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager); nodeManager.sendHeartbeat(datanodeID); String status = nodeManager.getChillModeStatus(); Assert.assertThat(status, CoreMatchers.containsString("Still in chill " + @@ -908,7 +857,7 @@ public class TestNodeManager { // Assert that node manager force enter cannot be overridden by nodes HBs. for(int x= 0; x < 20; x++) { - DatanodeID datanode = getDatanodeID(nodeManager); + DatanodeID datanode = SCMTestUtils.getDatanodeID(nodeManager); nodeManager.sendHeartbeat(datanode); }