diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index cf6f1cab41c..0a23912ff2c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -48,8 +48,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import static org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval; - /** * State Machine Class. */ @@ -60,7 +58,6 @@ public class DatanodeStateMachine implements Closeable { private final ExecutorService executorService; private final Configuration conf; private final SCMConnectionManager connectionManager; - private final long heartbeatFrequency; private StateContext context; private final OzoneContainer container; private DatanodeDetails datanodeDetails; @@ -86,7 +83,6 @@ public class DatanodeStateMachine implements Closeable { .setNameFormat("Datanode State Machine Thread - %d").build()); connectionManager = new SCMConnectionManager(conf); context = new StateContext(this.conf, DatanodeStates.getInitState(), this); - heartbeatFrequency = getScmHeartbeatInterval(conf); container = new OzoneContainer(this.datanodeDetails, new OzoneConfiguration(conf), context); nextHB = new AtomicLong(Time.monotonicNow()); @@ -147,6 +143,7 @@ public class DatanodeStateMachine implements Closeable { while (context.getState() != DatanodeStates.SHUTDOWN) { try { LOG.debug("Executing cycle Number : {}", context.getExecutionCount()); + long heartbeatFrequency = context.getHeartbeatFrequency(); nextHB.set(Time.monotonicNow() + heartbeatFrequency); context.execute(executorService, heartbeatFrequency, TimeUnit.MILLISECONDS); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 19c949680c0..a342294bd1d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -33,6 +33,8 @@ import org.apache.hadoop.ozone.protocol.commands.CommandStatus; import org.apache.hadoop.ozone.protocol.commands.CommandStatus .CommandStatusBuilder; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; + +import static org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +68,13 @@ public class StateContext { private final Queue containerActions; private DatanodeStateMachine.DatanodeStates state; + /** + * Starting with a 2 sec heartbeat frequency which will be updated to the + * real HB frequency after scm registration. With this method the + * initial registration could be significant faster. + */ + private AtomicLong heartbeatFrequency = new AtomicLong(2000); + /** * Constructs a StateContext. * @@ -398,4 +407,15 @@ public class StateContext { } return false; } + + public void configureHeartbeatFrequency(){ + heartbeatFrequency.set(getScmHeartbeatInterval(conf)); + } + + /** + * Return current heartbeat frequency in ms. + */ + public long getHeartbeatFrequency() { + return heartbeatFrequency.get(); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java index 1758c03fef6..ec2358ae18b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/RunningDatanodeState.java @@ -101,6 +101,7 @@ public class RunningDatanodeState implements DatanodeState { return RegisterEndpointTask.newBuilder() .setConfig(conf) .setEndpointStateMachine(endpoint) + .setContext(context) .setDatanodeDetails(context.getParent().getDatanodeDetails()) .setOzoneContainer(context.getParent().getContainer()) .build(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java index 25af4a1d1e9..ccab0956e72 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +51,7 @@ public final class RegisterEndpointTask implements private Future result; private DatanodeDetails datanodeDetails; private final OzoneContainer datanodeContainerManager; + private StateContext stateContext; /** * Creates a register endpoint task. @@ -60,10 +62,12 @@ public final class RegisterEndpointTask implements */ @VisibleForTesting public RegisterEndpointTask(EndpointStateMachine rpcEndPoint, - Configuration conf, OzoneContainer ozoneContainer) { + Configuration conf, OzoneContainer ozoneContainer, + StateContext context) { this.rpcEndPoint = rpcEndPoint; this.conf = conf; this.datanodeContainerManager = ozoneContainer; + this.stateContext = context; } @@ -124,6 +128,7 @@ public final class RegisterEndpointTask implements rpcEndPoint.getState().getNextState(); rpcEndPoint.setState(nextState); rpcEndPoint.zeroMissedCount(); + this.stateContext.configureHeartbeatFrequency(); } catch (IOException ex) { rpcEndPoint.logIfNeeded(ex); } finally { @@ -150,6 +155,7 @@ public final class RegisterEndpointTask implements private Configuration conf; private DatanodeDetails datanodeDetails; private OzoneContainer container; + private StateContext context; /** * Constructs the builder class. @@ -200,6 +206,10 @@ public final class RegisterEndpointTask implements return this; } + public Builder setContext(StateContext stateContext) { + this.context = stateContext; + return this; + } public RegisterEndpointTask build() { if (endPointStateMachine == null) { @@ -210,8 +220,9 @@ public final class RegisterEndpointTask implements if (conf == null) { LOG.error("No config specified."); - throw new IllegalArgumentException("A valid configration is needed to" + - " construct RegisterEndpoint task"); + throw new IllegalArgumentException( + "A valid configuration is needed to construct RegisterEndpoint " + + "task"); } if (datanodeDetails == null) { @@ -223,13 +234,20 @@ public final class RegisterEndpointTask implements if (container == null) { LOG.error("Container is not specified"); throw new IllegalArgumentException("Container is not specified to " + - "constrict RegisterEndpoint task"); + "construct RegisterEndpoint task"); + } + + if (context == null) { + LOG.error("StateContext is not specified"); + throw new IllegalArgumentException("Container is not specified to " + + "construct RegisterEndpoint task"); } RegisterEndpointTask task = new RegisterEndpointTask(this - .endPointStateMachine, this.conf, this.container); + .endPointStateMachine, this.conf, this.container, this.context); task.setDatanodeDetails(datanodeDetails); return task; } + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java index 5071d8d7311..5efcdd113ff 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@ -309,7 +309,8 @@ public class TestEndPoint { when(ozoneContainer.getContainerReport()).thenReturn( TestUtils.getRandomContainerReports(10)); RegisterEndpointTask endpointTask = - new RegisterEndpointTask(rpcEndPoint, conf, ozoneContainer); + new RegisterEndpointTask(rpcEndPoint, conf, ozoneContainer, + mock(StateContext.class)); if (!clearDatanodeDetails) { DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); endpointTask.setDatanodeDetails(datanodeDetails);