diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index c1aefe9bfd4..eab19563b8d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -873,7 +873,7 @@ public class DataNode extends ReconfigurableBase * @throws UnknownHostException if the dfs.datanode.dns.interface * option is used and the hostname can not be determined */ - private static String getHostName(Configuration config) + public static String getHostName(Configuration config) throws UnknownHostException { String name = config.get(DFS_DATANODE_HOST_NAME_KEY); if (name == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java index 549dc804c54..5d1aed82de2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java @@ -1,43 +1,61 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at *
* http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
*/
package org.apache.hadoop.ozone;
import com.google.common.base.Optional;
+import com.google.common.net.HostAndPort;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.scm.ScmConfigKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_MS;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERVAL_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERVAL_MS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_SCM_DEADNODE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_SCM_DEADNODE_INTERVAL_MS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_SCM_HEARTBEAT_LOG_WARN_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_SCM_HEARTBEAT_LOG_WARN_INTERVAL_COUNT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_SCM_STALENODE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_SCM_STALENODE_INTERVAL_MS;
/**
* Utility methods for Ozone and Container Clients.
@@ -51,6 +69,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERV
public final class OzoneClientUtils {
private static final Logger LOG = LoggerFactory.getLogger(
OzoneClientUtils.class);
+ private static final int NO_PORT = -1;
/**
* The service ID of the solitary Ozone SCM service.
@@ -139,7 +158,7 @@ public final class OzoneClientUtils {
return NetUtils.createSocketAddr(
host.or(OZONE_SCM_CLIENT_BIND_HOST_DEFAULT) + ":" +
- port.or(OZONE_SCM_CLIENT_PORT_DEFAULT));
+ port.or(OZONE_SCM_CLIENT_PORT_DEFAULT));
}
/**
@@ -160,7 +179,7 @@ public final class OzoneClientUtils {
return NetUtils.createSocketAddr(
host.or(OZONE_SCM_DATANODE_BIND_HOST_DEFAULT) + ":" +
- port.or(OZONE_SCM_DATANODE_PORT_DEFAULT));
+ port.or(OZONE_SCM_DATANODE_PORT_DEFAULT));
}
/**
@@ -168,7 +187,7 @@ public final class OzoneClientUtils {
* Each config value may be absent, or if present in the format
* host:port (the :port part is optional).
*
- * @param conf
+ * @param conf - Conf
* @param keys a list of configuration key names.
*
* @return first hostname component found from the given keys, or absent.
@@ -176,51 +195,65 @@ public final class OzoneClientUtils {
* or host:port format.
*/
static Optional
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * State Machine Class.
+ */
+public class DatanodeStateMachine implements Closeable {
+ @VisibleForTesting
+ static final Logger LOG =
+ LoggerFactory.getLogger(DatanodeStateMachine.class);
+ private final ExecutorService executorService;
+ private final Configuration conf;
+ private final SCMConnectionManager connectionManager;
+ private final long taskWaitTime;
+ private final long heartbeatFrequency;
+ private StateContext context;
+
+ /**
+ * Constructs a container state machine.
+ *
+ * @param conf - Configration.
+ */
+ public DatanodeStateMachine(Configuration conf) {
+ this.conf = conf;
+ executorService = HadoopExecutors.newScheduledThreadPool(
+ this.conf.getInt(OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS,
+ OzoneConfigKeys.OZONE_SCM_CONTAINER_THREADS_DEFAULT),
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Container State Machine Thread - %d").build());
+ connectionManager = new SCMConnectionManager(conf);
+ context = new StateContext(this.conf, DatanodeStates.getInitState(), this);
+ taskWaitTime = this.conf.getLong(OzoneConfigKeys.OZONE_CONTAINER_TASK_WAIT,
+ OzoneConfigKeys.OZONE_CONTAINER_TASK_WAIT_DEFAULT);
+ heartbeatFrequency = OzoneClientUtils.getScmHeartbeatInterval(conf);
+ }
+
+ /**
+ * Returns the Connection manager for this state machine.
+ *
+ * @return - SCMConnectionManager.
+ */
+ public SCMConnectionManager getConnectionManager() {
+ return connectionManager;
+ }
+
+ /**
+ * Runs the state machine at a fixed frequency.
+ */
+ public void start() throws IOException {
+ long now = 0;
+ long nextHB = 0;
+ while (context.getState() != DatanodeStates.SHUTDOWN) {
+ try {
+ nextHB = Time.monotonicNow() + heartbeatFrequency;
+ context.execute(executorService, taskWaitTime, TimeUnit.SECONDS);
+ now = Time.monotonicNow();
+ if (now < nextHB) {
+ Thread.sleep(nextHB - now);
+ }
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ LOG.error("Unable to finish the execution", e);
+ }
+ }
+ }
+
+ /**
+ * Gets the current context.
+ *
+ * @return StateContext
+ */
+ public StateContext getContext() {
+ return context;
+ }
+
+ /**
+ * Sets the current context.
+ *
+ * @param context - Context
+ */
+ public void setContext(StateContext context) {
+ this.context = context;
+ }
+
+ /**
+ * Closes this stream and releases any system resources associated with it. If
+ * the stream is already closed then invoking this method has no effect.
+ *
+ * As noted in {@link AutoCloseable#close()}, cases where the close may
+ * fail require careful attention. It is strongly advised to relinquish the
+ * underlying resources and to internally mark the {@code Closeable}
+ * as closed, prior to throwing the {@code IOException}.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+ executorService.shutdown();
+ try {
+ if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+ executorService.shutdownNow();
+ }
+
+ if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+ LOG.error("Unable to shutdown statemachine properly.");
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Error attempting to shutdown.", e);
+ executorService.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+
+ for (EndpointStateMachine endPoint : connectionManager.getValues()) {
+ endPoint.close();
+ }
+ }
+
+ /**
+ * States that a datanode can be in. GetNextState will move this enum from
+ * getInitState to getLastState.
+ */
+ public enum DatanodeStates {
+ INIT(1),
+ RUNNING(2),
+ SHUTDOWN(3);
+ private final int value;
+
+ /**
+ * Constructs ContainerStates.
+ *
+ * @param value
+ */
+ DatanodeStates(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Returns the first State.
+ *
+ * @return First State.
+ */
+ public static DatanodeStates getInitState() {
+ return INIT;
+ }
+
+ /**
+ * The last state of endpoint states.
+ *
+ * @return last state.
+ */
+ public static DatanodeStates getLastState() {
+ return SHUTDOWN;
+ }
+
+ /**
+ * returns the numeric value associated with the endPoint.
+ *
+ * @return int.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Returns the next logical state that endPoint should move to. This
+ * function assumes the States are sequentially numbered.
+ *
+ * @return NextState.
+ */
+ public DatanodeStates getNextState() {
+ if (this.value < getLastState().getValue()) {
+ int stateValue = this.getValue() + 1;
+ for (DatanodeStates iter : values()) {
+ if (stateValue == iter.getValue()) {
+ return iter;
+ }
+ }
+ }
+ return getLastState();
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java
new file mode 100644
index 00000000000..2900a55355f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.protocol.VersionResponse;
+import org.apache.hadoop.ozone.protocolPB
+ .StorageContainerDatanodeProtocolClientSideTranslatorPB;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Endpoint is used as holder class that keeps state around the RPC endpoint.
+ */
+public class EndpointStateMachine implements Closeable {
+ static final Logger
+ LOG = LoggerFactory.getLogger(EndpointStateMachine.class);
+ private final StorageContainerDatanodeProtocolClientSideTranslatorPB endPoint;
+ private final AtomicLong missedCount;
+ private final InetSocketAddress address;
+ private final Lock lock;
+ private final Configuration conf;
+ private EndPointStates state;
+ private VersionResponse version;
+
+ /**
+ * Constructs RPC Endpoints.
+ *
+ * @param endPoint - RPC endPoint.
+ */
+ public EndpointStateMachine(InetSocketAddress address,
+ StorageContainerDatanodeProtocolClientSideTranslatorPB endPoint,
+ Configuration conf) {
+ this.endPoint = endPoint;
+ this.missedCount = new AtomicLong(0);
+ this.address = address;
+ state = EndPointStates.getInitState();
+ lock = new ReentrantLock();
+ this.conf = conf;
+ }
+
+ /**
+ * Takes a lock on this EndPoint so that other threads don't use this while we
+ * are trying to communicate via this endpoint.
+ */
+ public void lock() {
+ lock.lock();
+ }
+
+ /**
+ * Unlocks this endpoint.
+ */
+ public void unlock() {
+ lock.unlock();
+ }
+
+ /**
+ * Returns the version that we read from the server if anyone asks .
+ *
+ * @return - Version Response.
+ */
+ public VersionResponse getVersion() {
+ return version;
+ }
+
+ /**
+ * Sets the Version reponse we recieved from the SCM.
+ *
+ * @param version VersionResponse
+ */
+ public void setVersion(VersionResponse version) {
+ this.version = version;
+ }
+
+ /**
+ * Returns the current State this end point is in.
+ *
+ * @return - getState.
+ */
+ public EndPointStates getState() {
+ return state;
+ }
+
+ /**
+ * Sets the endpoint state.
+ *
+ * @param state - state.
+ */
+ public EndPointStates setState(EndPointStates state) {
+ this.state = state;
+ return this.state;
+ }
+
+ /**
+ * Closes the connection.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void close() throws IOException {
+ if (endPoint != null) {
+ endPoint.close();
+ }
+ }
+
+ /**
+ * We maintain a count of how many times we missed communicating with a
+ * specific SCM. This is not made atomic since the access to this is always
+ * guarded by the read or write lock. That is, it is serialized.
+ */
+ public void incMissed() {
+ this.missedCount.incrementAndGet();
+ }
+
+ /**
+ * Returns the value of the missed count.
+ *
+ * @return int
+ */
+ public long getMissedCount() {
+ return this.missedCount.get();
+ }
+
+ public void zeroMissedCount() {
+ this.missedCount.set(0);
+ }
+
+ /**
+ * Returns the InetAddress of the endPoint.
+ *
+ * @return - EndPoint.
+ */
+ public InetSocketAddress getAddress() {
+ return this.address;
+ }
+
+ /**
+ * Returns real RPC endPoint.
+ *
+ * @return rpc client.
+ */
+ public StorageContainerDatanodeProtocolClientSideTranslatorPB
+ getEndPoint() {
+ return endPoint;
+ }
+
+ /**
+ * Returns the string that represents this endpoint.
+ *
+ * @return - String
+ */
+ public String toString() {
+ return address.toString();
+ }
+
+ /**
+ * Logs exception if needed.
+ * @param ex - Exception
+ */
+ public void logIfNeeded(Exception ex) {
+ LOG.trace("Incrementing the Missed count. Ex : {}", ex);
+ this.incMissed();
+ if (this.getMissedCount() % OzoneClientUtils.getLogWarnInterval(conf) ==
+ 0) {
+ LOG.warn("Unable to communicate to SCM server at {}. We have not been " +
+ "able to communicate to this SCM server for past {} seconds.",
+ this.getAddress().getHostString() + ":" + this.getAddress().getPort(),
+ this.getMissedCount() * OzoneClientUtils.getScmHeartbeatInterval(
+ this.conf));
+ }
+ }
+
+
+ /**
+ * States that an Endpoint can be in.
+ *
+ * This is a sorted list of states that EndPoint will traverse.
+ *
+ * GetNextState will move this enum from getInitState to getLastState.
+ */
+ public enum EndPointStates {
+ GETVERSION(1),
+ REGISTER(2),
+ HEARTBEAT(3),
+ SHUTDOWN(4); // if you add value after this please edit getLastState too.
+ private final int value;
+
+ /**
+ * Constructs endPointStates.
+ *
+ * @param value state.
+ */
+ EndPointStates(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Returns the first State.
+ *
+ * @return First State.
+ */
+ public static EndPointStates getInitState() {
+ return GETVERSION;
+ }
+
+ /**
+ * The last state of endpoint states.
+ *
+ * @return last state.
+ */
+ public static EndPointStates getLastState() {
+ return SHUTDOWN;
+ }
+
+ /**
+ * returns the numeric value associated with the endPoint.
+ *
+ * @return int.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Returns the next logical state that endPoint should move to.
+ * The next state is computed by adding 1 to the current state.
+ *
+ * @return NextState.
+ */
+ public EndPointStates getNextState() {
+ if (this.getValue() < getLastState().getValue()) {
+ int stateValue = this.getValue() + 1;
+ for (EndPointStates iter : values()) {
+ if (stateValue == iter.getValue()) {
+ return iter;
+ }
+ }
+ }
+ return getLastState();
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
new file mode 100644
index 00000000000..33d361b8d7d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.protocolPB
+ .StorageContainerDatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * SCMConnectionManager - Acts as a class that manages the membership
+ * information of the SCMs that we are working with.
+ */
+public class SCMConnectionManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMConnectionManager.class);
+
+ private final ReadWriteLock mapLock;
+ private final Map
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
+import org.apache.hadoop.ozone.container.common.states.DatanodeState;
+import org.apache.hadoop.ozone.container.common.states.datanode
+ .RunningDatanodeState;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Current Context of State Machine.
+ */
+public class StateContext {
+ private final Queue
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.datanode;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.states.DatanodeState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Init Datanode State is the task that gets run when we are in Init State.
+ */
+public class InitDatanodeState implements DatanodeState,
+ Callable
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.datanode;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.states.DatanodeState;
+import org.apache.hadoop.ozone.container.common.states.endpoint
+ .HeartbeatEndpointTask;
+import org.apache.hadoop.ozone.container.common.states.endpoint
+ .RegisterEndpointTask;
+import org.apache.hadoop.ozone.container.common.states.endpoint
+ .VersionEndpointTask;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Class that implements handshake with SCM.
+ */
+public class RunningDatanodeState implements DatanodeState {
+ static final Logger
+ LOG = LoggerFactory.getLogger(RunningDatanodeState.class);
+ private final SCMConnectionManager connectionManager;
+ private final Configuration conf;
+ private final StateContext context;
+ private CompletionService
+ * if any endpoint state has moved to Shutdown, either we have an
+ * unrecoverable error or we have been told to shutdown. Either case the
+ * datanode state machine should move to Shutdown state, otherwise we
+ * remain in the Running state.
+ *
+ * @return next container state.
+ */
+ private DatanodeStateMachine.DatanodeStates
+ computeNextContainerState(
+ List
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.datanode;
+/**
+ This package contians files that guide the state transitions from
+ Init->Running->Shutdown for the datanode.
+ */
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
new file mode 100644
index 00000000000..4f877ff8d04
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.states.endpoint;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .EndpointStateMachine;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+/**
+ * Heartbeat class for SCMs.
+ */
+public class HeartbeatEndpointTask
+ implements Callable
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.endpoint;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .EndpointStateMachine;
+
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+/**
+ * Register a container with SCM.
+ */
+public final class RegisterEndpointTask implements
+ Callable
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.endpoint;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
+import org.apache.hadoop.ozone.protocol.VersionResponse;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+
+/**
+ * Task that returns version.
+ */
+public class VersionEndpointTask implements
+ Callable
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.endpoint;
+/**
+ This package contains code for RPC endpoints transitions.
+ */
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java
new file mode 100644
index 00000000000..92c953ff410
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.states;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index cf1e9bd2fc9..2565a04c695 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -64,8 +64,8 @@ public class OzoneContainer {
Configuration ozoneConfig,
FsDatasetSpi extends FsVolumeSpi> dataSet) throws Exception {
List
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+
+import java.io.IOException;
+
+/**
+ * The protocol spoken between datanodes and SCM. For specifics please the
+ * Protoc file that defines this protocol.
+ */
+@InterfaceAudience.Private
+public interface StorageContainerDatanodeProtocol {
+ /**
+ * Returns SCM version.
+ * @return Version info.
+ */
+ SCMVersionResponseProto getVersion() throws IOException;
+
+ /**
+ * Used by data node to send a Heartbeat.
+ * @param datanodeID - Datanode ID.
+ * @return - SCMHeartbeatResponseProto
+ * @throws IOException
+ */
+ SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID)
+ throws IOException;
+
+ /**
+ * Register Datanode.
+ * @param datanodeID - DatanodID.
+ * @param scmAddresses - List of SCMs this datanode is configured to
+ * communicate.
+ * @return SCM Command.
+ */
+ SCMRegisteredCmdResponseProto register(DatanodeID datanodeID,
+ String[] scmAddresses) throws IOException;
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java
index a44bad51260..4bdf422cc15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/NullCommand.java
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.ozone.protocol.commands;
-import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto.Type;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.NullCmdResponseProto;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java
index e8cc66094ed..f2944ceb797 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java
@@ -18,15 +18,19 @@
package org.apache.hadoop.ozone.protocol.commands;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.RegisteredCmdResponseProto;
-import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.RegisteredCmdResponseProto.ErrorCode;
-import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto.Type;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
+ .ErrorCode;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.Type;
/**
* Response to Datanode Register call.
*/
-public class RegisteredCommand extends SCMCommand
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocolPB;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMRegisterRequestProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * This class is the client-side translator to translate the requests made on
+ * the {@link StorageContainerDatanodeProtocol} interface to the RPC server
+ * implementing {@link StorageContainerDatanodeProtocolPB}.
+ */
+public class StorageContainerDatanodeProtocolClientSideTranslatorPB
+ implements StorageContainerDatanodeProtocol, ProtocolTranslator, Closeable {
+
+ /**
+ * RpcController is not used and hence is set to null.
+ */
+ private static final RpcController NULL_RPC_CONTROLLER = null;
+ private final StorageContainerDatanodeProtocolPB rpcProxy;
+
+ /**
+ * Constructs a Client side interface that calls into SCM datanode protocol.
+ *
+ * @param rpcProxy - Proxy for RPC.
+ */
+ public StorageContainerDatanodeProtocolClientSideTranslatorPB(
+ StorageContainerDatanodeProtocolPB rpcProxy) {
+ this.rpcProxy = rpcProxy;
+ }
+
+ /**
+ * Closes this stream and releases any system resources associated with it. If
+ * the stream is already closed then invoking this method has no effect.
+ *
+ * As noted in {@link AutoCloseable#close()}, cases where the close may
+ * fail require careful attention. It is strongly advised to relinquish the
+ * underlying resources and to internally mark the {@code Closeable}
+ * as closed, prior to throwing the {@code IOException}.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ public void close() throws IOException {
+ RPC.stopProxy(rpcProxy);
+ }
+
+ /**
+ * Return the proxy object underlying this protocol translator.
+ *
+ * @return the proxy object underlying this protocol translator.
+ */
+ @Override
+ public Object getUnderlyingProxyObject() {
+ return rpcProxy;
+ }
+
+ /**
+ * Returns SCM version.
+ *
+ * @return Version info.
+ */
+ @Override
+ public SCMVersionResponseProto getVersion() throws IOException {
+
+ SCMVersionRequestProto request =
+ SCMVersionRequestProto.newBuilder().build();
+ final SCMVersionResponseProto response;
+ try {
+ response = rpcProxy.getVersion(NULL_RPC_CONTROLLER, request);
+ } catch (ServiceException ex) {
+ throw ProtobufHelper.getRemoteException(ex);
+ }
+ return response;
+ }
+
+ /**
+ * Send by datanode to SCM.
+ *
+ * @param datanodeID - DatanodeID
+ * @throws IOException
+ */
+
+ @Override
+ public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID)
+ throws IOException {
+ SCMHeartbeatRequestProto.Builder req =
+ SCMHeartbeatRequestProto.newBuilder();
+ req.setDatanodeID(datanodeID.getProtoBufMessage());
+ final SCMHeartbeatResponseProto resp;
+ try {
+ resp = rpcProxy.sendHeartbeat(NULL_RPC_CONTROLLER, req.build());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ return resp;
+ }
+
+ /**
+ * Register Datanode.
+ *
+ * @param datanodeID - DatanodID.
+ * @return SCM Command.
+ */
+ @Override
+ public SCMRegisteredCmdResponseProto register(DatanodeID datanodeID,
+ String[] scmAddresses) throws IOException {
+ SCMRegisterRequestProto.Builder req =
+ SCMRegisterRequestProto.newBuilder();
+ req.setDatanodeID(datanodeID.getProtoBufMessage());
+ final SCMRegisteredCmdResponseProto response;
+ try {
+ response = rpcProxy.register(NULL_RPC_CONTROLLER, req.build());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ return response;
+ }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolPB.java
new file mode 100644
index 00000000000..3e6f392799d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolPB.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocolPB;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+
+import java.io.IOException;
+
+/**
+ * This class is the server-side translator that forwards requests received on
+ * {@link StorageContainerDatanodeProtocolPB} to the {@link
+ * StorageContainerDatanodeProtocol} server implementation.
+ */
+public class StorageContainerDatanodeProtocolServerSideTranslatorPB
+ implements StorageContainerDatanodeProtocolPB {
+
+ private final StorageContainerDatanodeProtocol impl;
+
+ public StorageContainerDatanodeProtocolServerSideTranslatorPB(
+ StorageContainerDatanodeProtocol impl) {
+ this.impl = impl;
+ }
+
+ @Override
+ public StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto
+ getVersion(RpcController controller,
+ StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto request)
+ throws ServiceException {
+ try {
+ return impl.getVersion();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
+ register(RpcController controller, StorageContainerDatanodeProtocolProtos
+ .SCMRegisterRequestProto request) throws ServiceException {
+ String[] addressArray = null;
+
+ if (request.hasAddressList()) {
+ addressArray = request.getAddressList().getAddressListList()
+ .toArray(new String[0]);
+ }
+
+ try {
+ return impl.register(DatanodeID.getFromProtoBuf(request
+ .getDatanodeID()), addressArray);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public SCMHeartbeatResponseProto
+ sendHeartbeat(RpcController controller,
+ SCMHeartbeatRequestProto request) throws ServiceException {
+ try {
+ return impl.sendHeartbeat(DatanodeID.getFromProtoBuf(request
+ .getDatanodeID()));
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java
index eb21eb30334..6bb3a22683c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java
@@ -26,6 +26,8 @@ public final class VersionInfo {
private final static VersionInfo[] VERSION_INFOS =
{new VersionInfo("First version of SCM", 1)};
+
+ public static final String DESCRIPTION_KEY = "Description";
private final String description;
private final int version;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
index abf7d52f2c5..3fe70840eb4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
@@ -29,8 +29,11 @@ import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
import org.apache.hadoop.ozone.protocol.VersionResponse;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.RegisteredCmdResponseProto.ErrorCode;
-import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
+ .ErrorCode;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.scm.VersionInfo;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
@@ -43,7 +46,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
@@ -580,21 +582,20 @@ public class SCMNodeManager
@Override
public SCMCommand register(DatanodeID datanodeID) {
- SCMCommand errorCode = verifyDatanodeUUID(datanodeID);
- if (errorCode != null) {
- return errorCode;
+ SCMCommand responseCommand = verifyDatanodeUUID(datanodeID);
+ if (responseCommand != null) {
+ return responseCommand;
}
- DatanodeID newDatanodeID = new DatanodeID(UUID.randomUUID().toString(),
- datanodeID);
- nodes.put(newDatanodeID.getDatanodeUuid(), newDatanodeID);
+
+ nodes.put(datanodeID.getDatanodeUuid(), datanodeID);
totalNodes.incrementAndGet();
- healthyNodes.put(newDatanodeID.getDatanodeUuid(), monotonicNow());
+ healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
healthyNodeCount.incrementAndGet();
LOG.info("Data node with ID: {} Registered.",
- newDatanodeID.getDatanodeUuid());
+ datanodeID.getDatanodeUuid());
return RegisteredCommand.newBuilder()
.setErrorCode(ErrorCode.success)
- .setDatanodeUUID(newDatanodeID.getDatanodeUuid())
+ .setDatanodeUUID(datanodeID.getDatanodeUuid())
.setClusterID(this.clusterID)
.build();
}
@@ -607,20 +608,12 @@ public class SCMNodeManager
*/
private SCMCommand verifyDatanodeUUID(DatanodeID datanodeID) {
- // Make sure that we return the right error code, so that
- // data node can log the correct error. if it is already registered then
- // datanode should move to heartbeat state. It implies that somehow we
- // have an error where the data node is trying to re-register.
- //
- // We are going to let the datanode know that there is an error but allow it
- // to recover by sending it the right info that is needed for recovery.
-
if (datanodeID.getDatanodeUuid() != null &&
nodes.containsKey(datanodeID.getDatanodeUuid())) {
- LOG.error("Datanode is already registered. Datanode: {}",
+ LOG.trace("Datanode is already registered. Datanode: {}",
datanodeID.toString());
return RegisteredCommand.newBuilder()
- .setErrorCode(ErrorCode.errorNodeAlreadyRegistered)
+ .setErrorCode(ErrorCode.success)
.setClusterID(this.clusterID)
.setDatanodeUUID(datanodeID.getDatanodeUuid())
.build();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
index 78e7e743c15..5dea5cc684d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -31,9 +31,13 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.hdfs;
+
import "hdfs.proto";
+
import "HdfsServer.proto";
+
import "DatanodeProtocol.proto";
+
import "DatanodeContainerProtocol.proto";
@@ -45,6 +49,10 @@ message SCMHeartbeatRequestProto {
required DatanodeIDProto datanodeID = 1;
}
+message SCMRegisterRequestProto {
+ required DatanodeIDProto datanodeID = 1;
+ optional SCMNodeAddressList addressList = 2;
+}
/**
* Request for version info of the software stack on the server.
@@ -59,24 +67,38 @@ message SCMVersionRequestProto {
*/
message SCMVersionResponseProto {
required uint32 softwareVersion = 1;
- repeated hadoop.hdfs.ozone.KeyValue keys = 2;
+ repeated hadoop.hdfs.ozone.KeyValue keys = 2;
+}
+
+message SCMNodeAddressList {
+ repeated string addressList = 1;
}
/**
* Datanode ID returned by the SCM. This is similar to name node
* registeration of a datanode.
*/
-message RegisteredCmdResponseProto {
+message SCMRegisteredCmdResponseProto {
enum ErrorCode {
success = 1;
- errorNodeAlreadyRegistered = 2;
- errorNodeNotPermitted = 3;
+ errorNodeNotPermitted = 2;
}
- required ErrorCode errorCode = 1;
- optional string datanodeUUID = 2;
- optional string clusterID = 3;
+ required ErrorCode errorCode = 2;
+ optional string datanodeUUID = 3;
+ optional string clusterID = 4;
+ optional SCMNodeAddressList addressList = 5;
}
+/**
+ * Container ID maintains the container's Identity along with cluster ID
+ * after the registration is done.
+ */
+message ContainerNodeIDProto {
+ required DatanodeIDProto datanodeID = 1;
+ optional string clusterID = 2;
+}
+
+
/**
* Empty Command Response
*/
@@ -84,18 +106,21 @@ message NullCmdResponseProto {
}
+/**
+Type of commands supported by SCM to datanode protocol.
+*/
+enum Type {
+ nullCmd = 1;
+ versionCommand = 2;
+ registeredCommand = 3;
+}
+
/*
* These are commands returned by SCM for to the datanode to execute.
*/
-message SCMCommandResponseProto {
- enum Type {
- nullCmd = 1;
- registeredCmd = 2; // Returns the datanode ID after registeration.
- }
-
- required Type cmdType = 1; // Type of the command
- optional NullCmdResponseProto nullCommand = 2;
- optional RegisteredCmdResponseProto registerNode = 3;
+message SCMCommandResponseProto {
+ required Type cmdType = 2; // Type of the command
+ optional NullCmdResponseProto nullCommand = 3;
}
@@ -160,12 +185,11 @@ message SCMHeartbeatResponseProto {
* registered with some SCM. If this file is not found, datanode assumes that
* it needs to do a registration.
*
- * If registration is need datanode moves into REGISTERING_NODE state. It will
- * send a register call with datanodeID data structure, but with datanode UUID
- * will be set to an empty string.
+ * If registration is need datanode moves into REGISTER state. It will
+ * send a register call with datanodeID data structure and presist that info.
*
- * The response to the command contains the datanode UUID and clusterID. This
- * information is persisted by the datanode and moves into heartbeat state.
+ * The response to the command contains clusterID. This information is
+ * also persisted by the datanode and moves into heartbeat state.
*
* Once in the heartbeat state, datanode sends heartbeats and container reports
* to SCM and process commands issued by SCM until it is shutdown.
@@ -176,12 +200,12 @@ service StorageContainerDatanodeProtocolService {
/**
* Gets the version information from the SCM.
*/
- rpc getVersion(SCMVersionRequestProto) returns (SCMVersionResponseProto);
+ rpc getVersion (SCMVersionRequestProto) returns (SCMVersionResponseProto);
/**
* Registers a data node with SCM.
*/
- rpc register(SCMHeartbeatRequestProto) returns (SCMCommandResponseProto);
+ rpc register (SCMRegisterRequestProto) returns (SCMRegisteredCmdResponseProto);
/**
* Send heartbeat from datanode to SCM. HB's under SCM looks more
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java
new file mode 100644
index 00000000000..c09693b0927
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common;
+
+import com.google.protobuf.BlockingService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
+import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageContainerDatanodeProtocolService;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Test Endpoint class.
+ */
+public final class SCMTestUtils {
+ /**
+ * Never constructed.
+ */
+ private SCMTestUtils() {
+ }
+
+ /**
+ * Starts an RPC server, if configured.
+ *
+ * @param conf configuration
+ * @param addr configured address of RPC server
+ * @param protocol RPC protocol provided by RPC server
+ * @param instance RPC protocol implementation instance
+ * @param handlerCount RPC server handler count
+ * @return RPC server
+ * @throws IOException if there is an I/O error while creating RPC server
+ */
+ private static RPC.Server startRpcServer(Configuration conf,
+ InetSocketAddress addr, Class>
+ protocol, BlockingService instance, int handlerCount)
+ throws IOException {
+ RPC.Server rpcServer = new RPC.Builder(conf)
+ .setProtocol(protocol)
+ .setInstance(instance)
+ .setBindAddress(addr.getHostString())
+ .setPort(addr.getPort())
+ .setNumHandlers(handlerCount)
+ .setVerbose(false)
+ .setSecretManager(null)
+ .build();
+
+ DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
+ return rpcServer;
+ }
+
+ /**
+ * Creates an Endpoint class for testing purpose.
+ *
+ * @param conf - Conf
+ * @param address - InetAddres
+ * @param rpcTimeout - rpcTimeOut
+ * @return EndPoint
+ * @throws Exception
+ */
+ public static EndpointStateMachine createEndpoint(Configuration conf,
+ InetSocketAddress address, int rpcTimeout) throws Exception {
+ RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
+ ProtobufRpcEngine.class);
+ long version =
+ RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
+
+ StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy(
+ StorageContainerDatanodeProtocolPB.class, version,
+ address, UserGroupInformation.getCurrentUser(), conf,
+ NetUtils.getDefaultSocketFactory(conf), rpcTimeout,
+ RetryPolicies.TRY_ONCE_THEN_FAIL).getProxy();
+
+ StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
+ new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy);
+ return new EndpointStateMachine(address, rpcClient, conf);
+ }
+
+ /**
+ * Start Datanode RPC server.
+ */
+ public static RPC.Server startScmRpcServer(Configuration configuration,
+ StorageContainerDatanodeProtocol server,
+ InetSocketAddress rpcServerAddresss, int handlerCount) throws
+ IOException {
+ RPC.setProtocolEngine(configuration,
+ StorageContainerDatanodeProtocolPB.class,
+ ProtobufRpcEngine.class);
+
+ BlockingService scmDatanodeService =
+ StorageContainerDatanodeProtocolService.
+ newReflectiveBlockingService(
+ new StorageContainerDatanodeProtocolServerSideTranslatorPB(
+ server));
+
+ RPC.Server scmServer = startRpcServer(configuration, rpcServerAddresss,
+ StorageContainerDatanodeProtocolPB.class, scmDatanodeService,
+ handlerCount);
+
+ scmServer.start();
+ return scmServer;
+ }
+
+ public static InetSocketAddress getReuseableAddress() throws IOException {
+ try (ServerSocket socket = new ServerSocket(0)) {
+ socket.setReuseAddress(true);
+ int port = socket.getLocalPort();
+ String addr = InetAddress.getLoopbackAddress().getHostAddress()
+ .toString();
+ return new InetSocketAddress(addr, port);
+ }
+ }
+
+ public static Configuration getConf() {
+ return new Configuration();
+ }
+
+ public static DatanodeID getDatanodeID(SCMNodeManager nodeManager) {
+
+ return getDatanodeID(nodeManager, UUID.randomUUID().toString());
+ }
+
+ /**
+ * Create a new DatanodeID with NodeID set to the string.
+ *
+ * @param uuid - node ID, it is generally UUID.
+ * @return DatanodeID.
+ */
+ public static DatanodeID getDatanodeID(SCMNodeManager nodeManager, String
+ uuid) {
+ DatanodeID tempDataNode = getDatanodeID(uuid);
+ RegisteredCommand command =
+ (RegisteredCommand) nodeManager.register(tempDataNode);
+ return new DatanodeID(command.getDatanodeUUID(), tempDataNode);
+ }
+
+ /**
+ * Get a datanode ID.
+ *
+ * @return DatanodeID
+ */
+ public static DatanodeID getDatanodeID() {
+ return getDatanodeID(UUID.randomUUID().toString());
+ }
+
+ private static DatanodeID getDatanodeID(String uuid) {
+ Random random = new Random();
+ String ipAddress = random.nextInt(256) + "."
+ + random.nextInt(256) + "."
+ + random.nextInt(256) + "."
+ + random.nextInt(256);
+
+ String hostName = uuid;
+ return new DatanodeID(ipAddress,
+ hostName, uuid, 0, 0, 0, 0);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
new file mode 100644
index 00000000000..ad805a76d77
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocol.VersionResponse;
+import org.apache.hadoop.ozone.protocol.commands.NullCommand;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.ozone.scm.VersionInfo;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * SCM RPC mock class.
+ */
+public class ScmTestMock implements StorageContainerDatanodeProtocol {
+ private int rpcResponseDelay;
+ private AtomicInteger heartbeatCount = new AtomicInteger(0);
+ private AtomicInteger rpcCount = new AtomicInteger(0);
+
+ /**
+ * Returns the number of heartbeats made to this class.
+ *
+ * @return int
+ */
+ public int getHeartbeatCount() {
+ return heartbeatCount.get();
+ }
+
+ /**
+ * Returns the number of RPC calls made to this mock class instance.
+ *
+ * @return - Number of RPC calls serviced by this class.
+ */
+ public int getRpcCount() {
+ return rpcCount.get();
+ }
+
+ /**
+ * Gets the RPC response delay.
+ *
+ * @return delay in milliseconds.
+ */
+ public int getRpcResponseDelay() {
+ return rpcResponseDelay;
+ }
+
+ /**
+ * Sets the RPC response delay.
+ *
+ * @param rpcResponseDelay - delay in milliseconds.
+ */
+ public void setRpcResponseDelay(int rpcResponseDelay) {
+ this.rpcResponseDelay = rpcResponseDelay;
+ }
+
+ /**
+ * Returns SCM version.
+ *
+ * @return Version info.
+ */
+ @Override
+ public StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto
+ getVersion() throws IOException {
+ rpcCount.incrementAndGet();
+ sleepIfNeeded();
+ VersionInfo versionInfo = VersionInfo.getLatestVersion();
+ return VersionResponse.newBuilder()
+ .setVersion(versionInfo.getVersion())
+ .addValue(VersionInfo.DESCRIPTION_KEY, versionInfo.getDescription())
+ .build().getProtobufMessage();
+ }
+
+ private void sleepIfNeeded() {
+ if (getRpcResponseDelay() > 0) {
+ try {
+ Thread.sleep(getRpcResponseDelay());
+ } catch (InterruptedException ex) {
+ // Just ignore this exception.
+ }
+ }
+ }
+
+ /**
+ * Used by data node to send a Heartbeat.
+ *
+ * @param datanodeID - Datanode ID.
+ * @return - SCMHeartbeatResponseProto
+ * @throws IOException
+ */
+ @Override
+ public StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
+ sendHeartbeat(DatanodeID datanodeID)
+ throws IOException {
+ rpcCount.incrementAndGet();
+ heartbeatCount.incrementAndGet();
+ sleepIfNeeded();
+ StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto
+ cmdResponse = StorageContainerDatanodeProtocolProtos
+ .SCMCommandResponseProto
+ .newBuilder().setCmdType(StorageContainerDatanodeProtocolProtos
+ .Type.nullCmd)
+ .setNullCommand(
+ NullCommand.newBuilder().build().getProtoBufMessage()).build();
+ return StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto
+ .newBuilder()
+ .addCommands(cmdResponse).build();
+ }
+
+ /**
+ * Register Datanode.
+ *
+ * @param datanodeID - DatanodID.
+ * @param scmAddresses - List of SCMs this datanode is configured to
+ * communicate.
+ * @return SCM Command.
+ */
+ @Override
+ public StorageContainerDatanodeProtocolProtos
+ .SCMRegisteredCmdResponseProto register(DatanodeID datanodeID,
+ String[] scmAddresses) throws IOException {
+ rpcCount.incrementAndGet();
+ sleepIfNeeded();
+ return StorageContainerDatanodeProtocolProtos
+ .SCMRegisteredCmdResponseProto
+ .newBuilder().setClusterID(UUID.randomUUID().toString())
+ .setDatanodeUUID(datanodeID.getDatanodeUuid()).setErrorCode(
+ StorageContainerDatanodeProtocolProtos
+ .SCMRegisteredCmdResponseProto.ErrorCode.success).build();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
new file mode 100644
index 00000000000..b75a925e80b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .DatanodeStateMachine;
+
+import org.apache.hadoop.ozone.container.common.statemachine
+ .EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .SCMConnectionManager;
+
+import org.apache.hadoop.ozone.container.common.states.DatanodeState;
+import org.apache.hadoop.ozone.container.common.states.datanode
+ .InitDatanodeState;
+import org.apache.hadoop.ozone.container.common.states.datanode
+ .RunningDatanodeState;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Tests the datanode state machine class and its states.
+ */
+public class TestDatanodeStateMachine {
+ private final int scmServerCount = 3;
+ private List
+ * Here is what happens at High level.
+ *
+ * 1. We start the datanodeStateMachine in the INIT State.
+ *
+ * 2. We invoke the INIT state task.
+ *
+ * 3. That creates a set of RPC endpoints that are ready to connect to SCMs.
+ *
+ * 4. We assert that we have moved to the running state for the
+ * DatanodeStateMachine.
+ *
+ * 5. We get the task for the Running State - Executing that running state,
+ * makes the first network call in of the state machine. The Endpoint is in
+ * the GETVERSION State and we invoke the task.
+ *
+ * 6. We assert that this call was a success by checking that each of the
+ * endponts now have version response that it got from the SCM server that it
+ * was talking to and also each of the mock server serviced one RPC call.
+ *
+ * 7. Since the Register is done now, next calls to get task will return
+ * HeartbeatTask, which sends heartbeats to SCM. We assert that we get right
+ * task from sub-system below.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testDatanodeStateContext() throws IOException,
+ InterruptedException, ExecutionException, TimeoutException {
+ final DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf);
+ DatanodeStateMachine.DatanodeStates currentState =
+ stateMachine.getContext().getState();
+ Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
+ currentState);
+
+ DatanodeState
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.states.endpoint
+ .HeartbeatEndpointTask;
+import org.apache.hadoop.ozone.container.common.states.endpoint
+ .RegisterEndpointTask;
+import org.apache.hadoop.ozone.container.common.states.endpoint
+ .VersionEndpointTask;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.ozone.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.Type;
+import org.apache.hadoop.ozone.scm.VersionInfo;
+import org.apache.hadoop.util.Time;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.internal.matchers.LessOrEqual;
+
+import java.net.InetSocketAddress;
+import java.util.UUID;
+
+/**
+ * Tests the endpoints.
+ */
+public class TestEndPoint {
+ private static InetSocketAddress serverAddress;
+ private static RPC.Server scmServer;
+ private static ScmTestMock scmServerImpl;
+
+ @Test
+ /**
+ * This test asserts that we are able to make a version call to SCM server
+ * and gets back the expected values.
+ */
+ public void testGetVersion() throws Exception {
+ try (EndpointStateMachine rpcEndPoint =
+ SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
+ serverAddress, 1000)) {
+ SCMVersionResponseProto responseProto = rpcEndPoint.getEndPoint()
+ .getVersion();
+ Assert.assertNotNull(responseProto);
+ Assert.assertEquals(responseProto.getKeys(0).getKey(),
+ VersionInfo.DESCRIPTION_KEY);
+ Assert.assertEquals(responseProto.getKeys(0).getValue(),
+ VersionInfo.getLatestVersion().getDescription());
+ }
+ }
+
+ @Test
+ /**
+ * We make getVersion RPC call, but via the VersionEndpointTask which is
+ * how the state machine would make the call.
+ */
+ public void testGetVersionTask() throws Exception {
+ Configuration conf = SCMTestUtils.getConf();
+ try (EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(conf,
+ serverAddress, 1000)) {
+ rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
+ VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
+ conf);
+ EndpointStateMachine.EndPointStates newState = versionTask.call();
+
+ // if version call worked the endpoint should automatically move to the
+ // next state.
+ Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER,
+ newState);
+
+ // Now rpcEndpoint should remember the version it got from SCM
+ Assert.assertNotNull(rpcEndPoint.getVersion());
+ }
+ }
+
+ @Test
+ /**
+ * This test makes a call to end point where there is no SCM server. We
+ * expect that versionTask should be able to handle it.
+ */
+ public void testGetVersionToInvalidEndpoint() throws Exception {
+ Configuration conf = SCMTestUtils.getConf();
+ InetSocketAddress nonExistentServerAddress = SCMTestUtils
+ .getReuseableAddress();
+ try (EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(conf,
+ nonExistentServerAddress, 1000)) {
+ rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
+ VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
+ conf);
+ EndpointStateMachine.EndPointStates newState = versionTask.call();
+
+ // This version call did NOT work, so endpoint should remain in the same
+ // state.
+ Assert.assertEquals(EndpointStateMachine.EndPointStates.GETVERSION,
+ newState);
+ }
+ }
+
+ @Test
+ /**
+ * This test makes a getVersionRPC call, but the DummyStorageServer is
+ * going to respond little slowly. We will assert that we are still in the
+ * GETVERSION state after the timeout.
+ */
+ public void testGetVersionAssertRpcTimeOut() throws Exception {
+ final long rpcTimeout = 1000;
+ final long tolerance = 100;
+ Configuration conf = SCMTestUtils.getConf();
+
+ try (EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(conf,
+ serverAddress, (int) rpcTimeout)) {
+ rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
+ VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
+ conf);
+
+ scmServerImpl.setRpcResponseDelay(1500);
+ long start = Time.monotonicNow();
+ EndpointStateMachine.EndPointStates newState = versionTask.call();
+ long end = Time.monotonicNow();
+ scmServerImpl.setRpcResponseDelay(0);
+ Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance));
+ Assert.assertEquals(EndpointStateMachine.EndPointStates.GETVERSION,
+ newState);
+ }
+ }
+
+ @Test
+ public void testRegister() throws Exception {
+ String[] scmAddressArray = new String[1];
+ scmAddressArray[0] = serverAddress.toString();
+ DatanodeID nodeToRegister = SCMTestUtils.getDatanodeID();
+ try (EndpointStateMachine rpcEndPoint =
+ SCMTestUtils.createEndpoint(
+ SCMTestUtils.getConf(), serverAddress, 1000)) {
+ SCMRegisteredCmdResponseProto responseProto = rpcEndPoint.getEndPoint()
+ .register(nodeToRegister, scmAddressArray);
+ Assert.assertNotNull(responseProto);
+ Assert.assertEquals(responseProto.getDatanodeUUID(),
+ nodeToRegister.getDatanodeUuid());
+ Assert.assertNotNull(responseProto.getClusterID());
+ }
+ }
+
+ private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress,
+ int rpcTimeout, boolean clearContainerID) throws Exception {
+ EndpointStateMachine rpcEndPoint =
+ SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
+ scmAddress, rpcTimeout);
+ rpcEndPoint.setState(EndpointStateMachine.EndPointStates.REGISTER);
+ RegisterEndpointTask endpointTask =
+ new RegisterEndpointTask(rpcEndPoint, SCMTestUtils.getConf());
+ if (!clearContainerID) {
+ ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder()
+ .setClusterID(UUID.randomUUID().toString())
+ .setDatanodeID(SCMTestUtils.getDatanodeID().getProtoBufMessage())
+ .build();
+ endpointTask.setContainerNodeIDProto(containerNodeID);
+ }
+ endpointTask.call();
+ return rpcEndPoint;
+ }
+
+ @Test
+ public void testRegisterTask() throws Exception {
+ try (EndpointStateMachine rpcEndpoint =
+ registerTaskHelper(serverAddress, 1000, false)) {
+ // Successful register should move us to Heartbeat state.
+ Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT,
+ rpcEndpoint.getState());
+ }
+ }
+
+ @Test
+ public void testRegisterToInvalidEndpoint() throws Exception {
+ InetSocketAddress address = SCMTestUtils.getReuseableAddress();
+ try (EndpointStateMachine rpcEndpoint =
+ registerTaskHelper(address, 1000, false)) {
+ Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER,
+ rpcEndpoint.getState());
+ }
+ }
+
+ @Test
+ public void testRegisterNoContainerID() throws Exception {
+ InetSocketAddress address = SCMTestUtils.getReuseableAddress();
+ try (EndpointStateMachine rpcEndpoint =
+ registerTaskHelper(address, 1000, true)) {
+ // No Container ID, therefore we tell the datanode that we would like to
+ // shutdown.
+ Assert.assertEquals(EndpointStateMachine.EndPointStates.SHUTDOWN,
+ rpcEndpoint.getState());
+ }
+ }
+
+ @Test
+ public void testRegisterRpcTimeout() throws Exception {
+ final long rpcTimeout = 1000;
+ final long tolerance = 200;
+ scmServerImpl.setRpcResponseDelay(1500);
+ long start = Time.monotonicNow();
+ registerTaskHelper(serverAddress, 1000, false).close();
+ long end = Time.monotonicNow();
+ scmServerImpl.setRpcResponseDelay(0);
+ Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance));
+ }
+
+ @Test
+ public void testHeartbeat() throws Exception {
+ DatanodeID dataNode = SCMTestUtils.getDatanodeID();
+ try (EndpointStateMachine rpcEndPoint =
+ SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
+ serverAddress, 1000)) {
+ SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
+ .sendHeartbeat(dataNode);
+ Assert.assertNotNull(responseProto);
+ Assert.assertEquals(1, responseProto.getCommandsCount());
+ Assert.assertNotNull(responseProto.getCommandsList().get(0));
+ Assert.assertEquals(responseProto.getCommandsList().get(0).getCmdType(),
+ Type.nullCmd);
+ }
+ }
+
+ private EndpointStateMachine heartbeatTaskHelper(InetSocketAddress scmAddress,
+ int rpcTimeout) throws Exception {
+ EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(
+ SCMTestUtils.getConf(),
+ scmAddress, rpcTimeout);
+ ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder()
+ .setClusterID(UUID.randomUUID().toString())
+ .setDatanodeID(SCMTestUtils.getDatanodeID().getProtoBufMessage())
+ .build();
+ rpcEndPoint.setState(EndpointStateMachine.EndPointStates.HEARTBEAT);
+ HeartbeatEndpointTask endpointTask =
+ new HeartbeatEndpointTask(rpcEndPoint, SCMTestUtils.getConf());
+ endpointTask.setContainerNodeIDProto(containerNodeID);
+ endpointTask.call();
+ Assert.assertNotNull(endpointTask.getContainerNodeIDProto());
+ return rpcEndPoint;
+ }
+
+ private void heartbeatTaskHelper(InetSocketAddress address)
+ throws Exception {
+ try (EndpointStateMachine rpcEndpoint =
+ heartbeatTaskHelper(address, 1000)) {
+ Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT,
+ rpcEndpoint.getState());
+ }
+ }
+
+ @Test
+ public void testHeartbeatTask() throws Exception {
+ heartbeatTaskHelper(serverAddress);
+ }
+
+ @Test
+ public void testHeartbeatTaskToInvalidNode() throws Exception {
+ InetSocketAddress invalidAddress = SCMTestUtils.getReuseableAddress();
+ heartbeatTaskHelper(invalidAddress);
+ }
+
+ @Test
+ public void testHeartbeatTaskRpcTimeOut() throws Exception {
+ final long rpcTimeout = 1000;
+ final long tolerance = 200;
+ scmServerImpl.setRpcResponseDelay(1500);
+ long start = Time.monotonicNow();
+ InetSocketAddress invalidAddress = SCMTestUtils.getReuseableAddress();
+ heartbeatTaskHelper(invalidAddress);
+ long end = Time.monotonicNow();
+ scmServerImpl.setRpcResponseDelay(0);
+ Assert.assertThat(end - start, new LessOrEqual<>(rpcTimeout + tolerance));
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (scmServer != null) {
+ scmServer.stop();
+ }
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ serverAddress = SCMTestUtils.getReuseableAddress();
+ scmServerImpl = new ScmTestMock();
+ scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(),
+ scmServerImpl, serverAddress, 10);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java
index 99287211981..0f028716ac7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
-import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
@@ -33,7 +33,6 @@ import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
-import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
@@ -72,37 +71,6 @@ public class TestNodeManager {
return new OzoneConfiguration();
}
- /**
- * Create a new datanode ID.
- *
- * @return DatanodeID
- */
- DatanodeID getDatanodeID(SCMNodeManager nodeManager) {
-
- return getDatanodeID(nodeManager, UUID.randomUUID().toString());
- }
-
- /**
- * Create a new DatanodeID with NodeID set to the string.
- *
- * @param uuid - node ID, it is generally UUID.
- * @return DatanodeID.
- */
- DatanodeID getDatanodeID(SCMNodeManager nodeManager, String uuid) {
- Random random = new Random();
- String ipAddress = random.nextInt(256) + "."
- + random.nextInt(256) + "."
- + random.nextInt(256) + "."
- + random.nextInt(256);
-
- String hostName = uuid;
- DatanodeID tempDataNode = new DatanodeID(ipAddress,
- hostName, uuid, 0, 0, 0, 0);
- RegisteredCommand command =
- (RegisteredCommand) nodeManager.register(tempDataNode);
- return new DatanodeID(command.getDatanodeUUID(), tempDataNode);
- }
-
/**
* Creates a NodeManager.
*
@@ -134,7 +102,7 @@ public class TestNodeManager {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
// Send some heartbeats from different nodes.
for (int x = 0; x < nodeManager.getMinimumChillModeNodes(); x++) {
- DatanodeID datanodeID = getDatanodeID(nodeManager);
+ DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
nodeManager.sendHeartbeat(datanodeID);
}
@@ -181,7 +149,7 @@ public class TestNodeManager {
// Need 100 nodes to come out of chill mode, only one node is sending HB.
nodeManager.setMinimumChillModeNodes(100);
- nodeManager.sendHeartbeat(getDatanodeID(nodeManager));
+ nodeManager.sendHeartbeat(SCMTestUtils.getDatanodeID(nodeManager));
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
4 * 1000);
assertFalse("Not enough heartbeat, Node manager should have been in " +
@@ -203,7 +171,7 @@ public class TestNodeManager {
try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
nodeManager.setMinimumChillModeNodes(3);
- DatanodeID datanodeID = getDatanodeID(nodeManager);
+ DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
// Send 10 heartbeat from same node, and assert we never leave chill mode.
for (int x = 0; x < 10; x++) {
@@ -232,7 +200,7 @@ public class TestNodeManager {
Configuration conf = getConf();
conf.setInt(OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100);
SCMNodeManager nodeManager = createNodeManager(conf);
- DatanodeID datanodeID = getDatanodeID(nodeManager);
+ DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
nodeManager.close();
// These should never be processed.
@@ -262,7 +230,7 @@ public class TestNodeManager {
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
for (int x = 0; x < count; x++) {
- DatanodeID datanodeID = getDatanodeID(nodeManager);
+ DatanodeID datanodeID = SCMTestUtils.getDatanodeID(nodeManager);
nodeManager.sendHeartbeat(datanodeID);
}
GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
@@ -346,7 +314,7 @@ public class TestNodeManager {
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
List