diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java index 9de06700f28..2eca221770a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java @@ -20,7 +20,7 @@ import javax.management.ObjectName; import java.io.Closeable; import java.io.IOException; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hdds.HddsConfigKeys; @@ -28,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.utils.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,9 +41,14 @@ public class ReplicationActivityStatus implements private static final Logger LOG = LoggerFactory.getLogger(ReplicationActivityStatus.class); + private Scheduler scheduler; private AtomicBoolean replicationEnabled = new AtomicBoolean(); private ObjectName jmxObjectName; + public ReplicationActivityStatus(Scheduler scheduler) { + this.scheduler = scheduler; + } + @Override public boolean isReplicationEnabled() { return replicationEnabled.get(); @@ -86,17 +92,11 @@ public void close() throws IOException { public void fireReplicationStart(boolean chillModeStatus, long waitTime) { if (!chillModeStatus) { - CompletableFuture.runAsync(() -> { - try { - Thread.sleep(waitTime); - } catch (InterruptedException ex) { - LOG.error("Interrupted during wait, replication event is not fired", - ex); - } + scheduler.schedule(() -> { setReplicationEnabled(true); - LOG.info("Replication Timer sleep for {} ms completed. Enable " + - "Replication", waitTime); - }); + LOG.info("Replication Timer sleep for {} ms completed. Enable " + + "Replication", waitTime); + }, waitTime, TimeUnit.MILLISECONDS); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 684b76f9096..a1491998330 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -99,6 +99,7 @@ import org.apache.hadoop.util.JvmPauseMonitor; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.utils.HddsVersionInfo; +import org.apache.hadoop.utils.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -175,6 +176,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl private SCMMetadataStore scmMetadataStore; private final EventQueue eventQueue; + private final Scheduler commonScheduler; /* * HTTP endpoint for JMX access. */ @@ -284,7 +286,8 @@ public StorageContainerManager(OzoneConfiguration conf, commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher", watcherTimeout); initalizeSystemManagers(conf, configurator); - replicationStatus = new ReplicationActivityStatus(); + commonScheduler = new Scheduler("SCMCommonScheduler", false, 1); + replicationStatus = new ReplicationActivityStatus(commonScheduler); CloseContainerEventHandler closeContainerHandler = new CloseContainerEventHandler(pipelineManager, containerManager); @@ -1004,6 +1007,13 @@ public void stop() { LOG.error("SCM Event Queue stop failed", ex); } + try { + LOG.info("Stopping SCM Common Scheduler."); + commonScheduler.close(); + } catch (Exception ex) { + LOG.error("SCM Common Scheduler close failed {}", ex); + } + if (jvmPauseMonitor != null) { jvmPauseMonitor.stop(); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestChillModeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestChillModeHandler.java index 22a3907a7b5..efd69fde71c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestChillModeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestChillModeHandler.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.utils.Scheduler; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -53,8 +54,8 @@ public void setup(boolean enabled) { "3s"); scmClientProtocolServer = Mockito.mock(SCMClientProtocolServer.class); - replicationActivityStatus = - new ReplicationActivityStatus(); + replicationActivityStatus = new ReplicationActivityStatus( + new Scheduler("SCMCommonScheduler", false, 1)); blockManager = Mockito.mock(BlockManagerImpl.class); chillModeHandler = new ChillModeHandler(configuration, scmClientProtocolServer, diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java index 4981c4bba0c..864a1a9936e 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -32,7 +32,10 @@ import org.apache.hadoop.hdds.scm.server .SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.utils.Scheduler; +import org.junit.AfterClass; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; import org.mockito.stubbing.Answer; @@ -60,6 +63,17 @@ */ public class TestContainerReportHandler { + private static Scheduler scheduler; + + @BeforeClass + public static void setup() { + scheduler = new Scheduler("SCMCommonScheduler", false, 1); + } + + @AfterClass + public static void tearDown() { + scheduler.close(); + } @Test public void testUnderReplicatedContainer() @@ -70,7 +84,7 @@ public void testUnderReplicatedContainer() final ContainerManager containerManager = Mockito.mock( ContainerManager.class); final ReplicationActivityStatus replicationActivityStatus = - new ReplicationActivityStatus(); + new ReplicationActivityStatus(scheduler); replicationActivityStatus.enableReplication(); final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); @@ -152,7 +166,7 @@ public void testOverReplicatedContainer() throws NodeNotFoundException, final ContainerManager containerManager = Mockito.mock( ContainerManager.class); final ReplicationActivityStatus replicationActivityStatus = - new ReplicationActivityStatus(); + new ReplicationActivityStatus(scheduler); replicationActivityStatus.enableReplication(); final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); @@ -234,7 +248,7 @@ public void testOpenToClosing() final ContainerManager containerManager = Mockito.mock( ContainerManager.class); final ReplicationActivityStatus replicationActivityStatus = - new ReplicationActivityStatus(); + new ReplicationActivityStatus(scheduler); replicationActivityStatus.enableReplication(); final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); @@ -329,7 +343,7 @@ public void testClosingToClosed() throws NodeNotFoundException, IOException { final ContainerManager containerManager = Mockito.mock( ContainerManager.class); final ReplicationActivityStatus replicationActivityStatus = - new ReplicationActivityStatus(); + new ReplicationActivityStatus(scheduler); replicationActivityStatus.enableReplication(); final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); @@ -401,7 +415,7 @@ public void testClosingToQuasiClosed() final ContainerManager containerManager = Mockito.mock( ContainerManager.class); final ReplicationActivityStatus replicationActivityStatus = - new ReplicationActivityStatus(); + new ReplicationActivityStatus(scheduler); replicationActivityStatus.enableReplication(); final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); @@ -475,7 +489,7 @@ public void testQuasiClosedWithDifferentOriginNodeReplica() final ContainerManager containerManager = Mockito.mock( ContainerManager.class); final ReplicationActivityStatus replicationActivityStatus = - new ReplicationActivityStatus(); + new ReplicationActivityStatus(scheduler); replicationActivityStatus.enableReplication(); final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); @@ -572,7 +586,7 @@ public void testQuasiClosedWithSameOriginNodeReplica() final ContainerManager containerManager = Mockito.mock( ContainerManager.class); final ReplicationActivityStatus replicationActivityStatus = - new ReplicationActivityStatus(); + new ReplicationActivityStatus(scheduler); replicationActivityStatus.enableReplication(); final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationActivityStatus.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationActivityStatus.java index 9019792f8a5..c36ba750793 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationActivityStatus.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationActivityStatus.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.utils.Scheduler; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; @@ -47,7 +48,8 @@ public class TestReplicationActivityStatus { @BeforeClass public static void setup() { eventQueue = new EventQueue(); - replicationActivityStatus = new ReplicationActivityStatus(); + replicationActivityStatus = new ReplicationActivityStatus( + new Scheduler("SCMCommonScheduler", false, 1)); OzoneConfiguration ozoneConfiguration = new OzoneConfiguration(); ozoneConfiguration.set(HddsConfigKeys.