HDDS-1306. TestContainerStateManagerIntegration fails in Ratis shutdown. Contributed by Lokesh Jain.
This commit is contained in:
parent
310ebf5dc8
commit
072750c00a
|
@ -20,7 +20,7 @@ package org.apache.hadoop.hdds.scm.container.replication;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||||
|
@ -28,6 +28,7 @@ import org.apache.hadoop.metrics2.util.MBeans;
|
||||||
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.utils.Scheduler;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -40,9 +41,14 @@ public class ReplicationActivityStatus implements
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(ReplicationActivityStatus.class);
|
LoggerFactory.getLogger(ReplicationActivityStatus.class);
|
||||||
|
|
||||||
|
private Scheduler scheduler;
|
||||||
private AtomicBoolean replicationEnabled = new AtomicBoolean();
|
private AtomicBoolean replicationEnabled = new AtomicBoolean();
|
||||||
private ObjectName jmxObjectName;
|
private ObjectName jmxObjectName;
|
||||||
|
|
||||||
|
public ReplicationActivityStatus(Scheduler scheduler) {
|
||||||
|
this.scheduler = scheduler;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isReplicationEnabled() {
|
public boolean isReplicationEnabled() {
|
||||||
return replicationEnabled.get();
|
return replicationEnabled.get();
|
||||||
|
@ -86,17 +92,11 @@ public class ReplicationActivityStatus implements
|
||||||
public void fireReplicationStart(boolean chillModeStatus,
|
public void fireReplicationStart(boolean chillModeStatus,
|
||||||
long waitTime) {
|
long waitTime) {
|
||||||
if (!chillModeStatus) {
|
if (!chillModeStatus) {
|
||||||
CompletableFuture.runAsync(() -> {
|
scheduler.schedule(() -> {
|
||||||
try {
|
|
||||||
Thread.sleep(waitTime);
|
|
||||||
} catch (InterruptedException ex) {
|
|
||||||
LOG.error("Interrupted during wait, replication event is not fired",
|
|
||||||
ex);
|
|
||||||
}
|
|
||||||
setReplicationEnabled(true);
|
setReplicationEnabled(true);
|
||||||
LOG.info("Replication Timer sleep for {} ms completed. Enable " +
|
LOG.info("Replication Timer sleep for {} ms completed. Enable "
|
||||||
"Replication", waitTime);
|
+ "Replication", waitTime);
|
||||||
});
|
}, waitTime, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -99,6 +99,7 @@ import org.apache.hadoop.util.GenericOptionsParser;
|
||||||
import org.apache.hadoop.util.JvmPauseMonitor;
|
import org.apache.hadoop.util.JvmPauseMonitor;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.utils.HddsVersionInfo;
|
import org.apache.hadoop.utils.HddsVersionInfo;
|
||||||
|
import org.apache.hadoop.utils.Scheduler;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -175,6 +176,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
||||||
private SCMMetadataStore scmMetadataStore;
|
private SCMMetadataStore scmMetadataStore;
|
||||||
|
|
||||||
private final EventQueue eventQueue;
|
private final EventQueue eventQueue;
|
||||||
|
private final Scheduler commonScheduler;
|
||||||
/*
|
/*
|
||||||
* HTTP endpoint for JMX access.
|
* HTTP endpoint for JMX access.
|
||||||
*/
|
*/
|
||||||
|
@ -284,7 +286,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
||||||
commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher",
|
commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher",
|
||||||
watcherTimeout);
|
watcherTimeout);
|
||||||
initalizeSystemManagers(conf, configurator);
|
initalizeSystemManagers(conf, configurator);
|
||||||
replicationStatus = new ReplicationActivityStatus();
|
commonScheduler = new Scheduler("SCMCommonScheduler", false, 1);
|
||||||
|
replicationStatus = new ReplicationActivityStatus(commonScheduler);
|
||||||
|
|
||||||
CloseContainerEventHandler closeContainerHandler =
|
CloseContainerEventHandler closeContainerHandler =
|
||||||
new CloseContainerEventHandler(pipelineManager, containerManager);
|
new CloseContainerEventHandler(pipelineManager, containerManager);
|
||||||
|
@ -1004,6 +1007,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
||||||
LOG.error("SCM Event Queue stop failed", ex);
|
LOG.error("SCM Event Queue stop failed", ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
LOG.info("Stopping SCM Common Scheduler.");
|
||||||
|
commonScheduler.close();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.error("SCM Common Scheduler close failed {}", ex);
|
||||||
|
}
|
||||||
|
|
||||||
if (jvmPauseMonitor != null) {
|
if (jvmPauseMonitor != null) {
|
||||||
jvmPauseMonitor.stop();
|
jvmPauseMonitor.stop();
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||||
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
|
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
|
||||||
import org.apache.hadoop.hdds.server.events.EventQueue;
|
import org.apache.hadoop.hdds.server.events.EventQueue;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.utils.Scheduler;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
@ -53,8 +54,8 @@ public class TestChillModeHandler {
|
||||||
"3s");
|
"3s");
|
||||||
scmClientProtocolServer =
|
scmClientProtocolServer =
|
||||||
Mockito.mock(SCMClientProtocolServer.class);
|
Mockito.mock(SCMClientProtocolServer.class);
|
||||||
replicationActivityStatus =
|
replicationActivityStatus = new ReplicationActivityStatus(
|
||||||
new ReplicationActivityStatus();
|
new Scheduler("SCMCommonScheduler", false, 1));
|
||||||
blockManager = Mockito.mock(BlockManagerImpl.class);
|
blockManager = Mockito.mock(BlockManagerImpl.class);
|
||||||
chillModeHandler =
|
chillModeHandler =
|
||||||
new ChillModeHandler(configuration, scmClientProtocolServer,
|
new ChillModeHandler(configuration, scmClientProtocolServer,
|
||||||
|
|
|
@ -32,7 +32,10 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
|
||||||
import org.apache.hadoop.hdds.scm.server
|
import org.apache.hadoop.hdds.scm.server
|
||||||
.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
|
.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
|
||||||
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||||
|
import org.apache.hadoop.utils.Scheduler;
|
||||||
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
@ -60,6 +63,17 @@ import static org.apache.hadoop.hdds.scm.container
|
||||||
*/
|
*/
|
||||||
public class TestContainerReportHandler {
|
public class TestContainerReportHandler {
|
||||||
|
|
||||||
|
private static Scheduler scheduler;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setup() {
|
||||||
|
scheduler = new Scheduler("SCMCommonScheduler", false, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() {
|
||||||
|
scheduler.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUnderReplicatedContainer()
|
public void testUnderReplicatedContainer()
|
||||||
|
@ -70,7 +84,7 @@ public class TestContainerReportHandler {
|
||||||
final ContainerManager containerManager = Mockito.mock(
|
final ContainerManager containerManager = Mockito.mock(
|
||||||
ContainerManager.class);
|
ContainerManager.class);
|
||||||
final ReplicationActivityStatus replicationActivityStatus =
|
final ReplicationActivityStatus replicationActivityStatus =
|
||||||
new ReplicationActivityStatus();
|
new ReplicationActivityStatus(scheduler);
|
||||||
replicationActivityStatus.enableReplication();
|
replicationActivityStatus.enableReplication();
|
||||||
|
|
||||||
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
|
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
|
||||||
|
@ -152,7 +166,7 @@ public class TestContainerReportHandler {
|
||||||
final ContainerManager containerManager = Mockito.mock(
|
final ContainerManager containerManager = Mockito.mock(
|
||||||
ContainerManager.class);
|
ContainerManager.class);
|
||||||
final ReplicationActivityStatus replicationActivityStatus =
|
final ReplicationActivityStatus replicationActivityStatus =
|
||||||
new ReplicationActivityStatus();
|
new ReplicationActivityStatus(scheduler);
|
||||||
replicationActivityStatus.enableReplication();
|
replicationActivityStatus.enableReplication();
|
||||||
|
|
||||||
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
|
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
|
||||||
|
@ -234,7 +248,7 @@ public class TestContainerReportHandler {
|
||||||
final ContainerManager containerManager = Mockito.mock(
|
final ContainerManager containerManager = Mockito.mock(
|
||||||
ContainerManager.class);
|
ContainerManager.class);
|
||||||
final ReplicationActivityStatus replicationActivityStatus =
|
final ReplicationActivityStatus replicationActivityStatus =
|
||||||
new ReplicationActivityStatus();
|
new ReplicationActivityStatus(scheduler);
|
||||||
replicationActivityStatus.enableReplication();
|
replicationActivityStatus.enableReplication();
|
||||||
|
|
||||||
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
|
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
|
||||||
|
@ -329,7 +343,7 @@ public class TestContainerReportHandler {
|
||||||
final ContainerManager containerManager = Mockito.mock(
|
final ContainerManager containerManager = Mockito.mock(
|
||||||
ContainerManager.class);
|
ContainerManager.class);
|
||||||
final ReplicationActivityStatus replicationActivityStatus =
|
final ReplicationActivityStatus replicationActivityStatus =
|
||||||
new ReplicationActivityStatus();
|
new ReplicationActivityStatus(scheduler);
|
||||||
replicationActivityStatus.enableReplication();
|
replicationActivityStatus.enableReplication();
|
||||||
|
|
||||||
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
|
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
|
||||||
|
@ -401,7 +415,7 @@ public class TestContainerReportHandler {
|
||||||
final ContainerManager containerManager = Mockito.mock(
|
final ContainerManager containerManager = Mockito.mock(
|
||||||
ContainerManager.class);
|
ContainerManager.class);
|
||||||
final ReplicationActivityStatus replicationActivityStatus =
|
final ReplicationActivityStatus replicationActivityStatus =
|
||||||
new ReplicationActivityStatus();
|
new ReplicationActivityStatus(scheduler);
|
||||||
replicationActivityStatus.enableReplication();
|
replicationActivityStatus.enableReplication();
|
||||||
|
|
||||||
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
|
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
|
||||||
|
@ -475,7 +489,7 @@ public class TestContainerReportHandler {
|
||||||
final ContainerManager containerManager = Mockito.mock(
|
final ContainerManager containerManager = Mockito.mock(
|
||||||
ContainerManager.class);
|
ContainerManager.class);
|
||||||
final ReplicationActivityStatus replicationActivityStatus =
|
final ReplicationActivityStatus replicationActivityStatus =
|
||||||
new ReplicationActivityStatus();
|
new ReplicationActivityStatus(scheduler);
|
||||||
replicationActivityStatus.enableReplication();
|
replicationActivityStatus.enableReplication();
|
||||||
|
|
||||||
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
|
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
|
||||||
|
@ -572,7 +586,7 @@ public class TestContainerReportHandler {
|
||||||
final ContainerManager containerManager = Mockito.mock(
|
final ContainerManager containerManager = Mockito.mock(
|
||||||
ContainerManager.class);
|
ContainerManager.class);
|
||||||
final ReplicationActivityStatus replicationActivityStatus =
|
final ReplicationActivityStatus replicationActivityStatus =
|
||||||
new ReplicationActivityStatus();
|
new ReplicationActivityStatus(scheduler);
|
||||||
replicationActivityStatus.enableReplication();
|
replicationActivityStatus.enableReplication();
|
||||||
|
|
||||||
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
|
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||||
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
|
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
|
||||||
import org.apache.hadoop.hdds.server.events.EventQueue;
|
import org.apache.hadoop.hdds.server.events.EventQueue;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.utils.Scheduler;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
@ -47,7 +48,8 @@ public class TestReplicationActivityStatus {
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setup() {
|
public static void setup() {
|
||||||
eventQueue = new EventQueue();
|
eventQueue = new EventQueue();
|
||||||
replicationActivityStatus = new ReplicationActivityStatus();
|
replicationActivityStatus = new ReplicationActivityStatus(
|
||||||
|
new Scheduler("SCMCommonScheduler", false, 1));
|
||||||
|
|
||||||
OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
|
OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
|
||||||
ozoneConfiguration.set(HddsConfigKeys.
|
ozoneConfiguration.set(HddsConfigKeys.
|
||||||
|
|
Loading…
Reference in New Issue