HDDS-362. Modify functions impacted by SCM chill mode in ScmBlockLocationProtocol. Contributed by Ajay Kumar.

This commit is contained in:
Xiaoyu Yao 2018-09-16 17:55:46 -07:00
parent 07385f886e
commit 95231f1749
10 changed files with 360 additions and 44 deletions

View File

@ -171,6 +171,13 @@ enum ReplicationFactor {
THREE = 3;
}
enum ScmOps {
allocateBlock = 1;
keyBlocksInfoList = 2;
getScmInfo = 3;
deleteBlock = 4;
}
/**
* Block ID that uniquely identify a block by SCM.
*/

View File

@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.block;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.Mapping;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
@ -28,6 +29,9 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.server.ChillModePrecheck;
import org.apache.hadoop.hdds.scm.server.Precheck;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.hdds.client.BlockID;
@ -61,7 +65,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
/** Block Manager manages the block access for SCM. */
public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
public class BlockManagerImpl implements EventHandler<Boolean>,
BlockManager, BlockmanagerMXBean {
private static final Logger LOG =
LoggerFactory.getLogger(BlockManagerImpl.class);
// TODO : FIX ME : Hard coding the owner.
@ -80,6 +85,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
private final int containerProvisionBatchSize;
private final Random rand;
private ObjectName mxBean;
private ChillModePrecheck chillModePrecheck;
/**
* Constructor.
@ -125,6 +131,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
blockDeletingService =
new SCMBlockDeletingService(deletedBlockLog, containerManager,
nodeManager, eventPublisher, svcInterval, serviceTimeout, conf);
chillModePrecheck = new ChillModePrecheck();
}
/**
@ -187,19 +194,13 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
ReplicationType type, ReplicationFactor factor, String owner)
throws IOException {
LOG.trace("Size;{} , type : {}, factor : {} ", size, type, factor);
preCheck(ScmOps.allocateBlock, chillModePrecheck);
if (size < 0 || size > containerSize) {
LOG.warn("Invalid block size requested : {}", size);
throw new SCMException("Unsupported block size: " + size,
INVALID_BLOCK_SIZE);
}
if (!nodeManager.isOutOfChillMode()) {
LOG.warn("Not out of Chill mode.");
throw new SCMException("Unable to create block while in chill mode",
CHILL_MODE_EXCEPTION);
}
/*
Here is the high level logic.
@ -430,4 +431,36 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
public SCMBlockDeletingService getSCMBlockDeletingService() {
return this.blockDeletingService;
}
/**
* Perform all prechecks for given operations.
*
* @param operation
* @param preChecks prechecks to be performed
*/
public void preCheck(ScmOps operation, Precheck... preChecks)
throws SCMException {
for (Precheck preCheck : preChecks) {
preCheck.check(operation);
}
}
@Override
public void onMessage(Boolean inChillMode, EventPublisher publisher) {
this.chillModePrecheck.setInChillMode(inChillMode);
}
/**
* Returns status of scm chill mode determined by CHILL_MODE_STATUS event.
* */
public boolean isScmInChillMode() {
return this.chillModePrecheck.isInChillMode();
}
/**
* Get class logger.
* */
public static Logger getLogger() {
return LOG;
}
}

View File

@ -21,7 +21,6 @@ import javax.management.ObjectName;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.metrics2.util.MBeans;
@ -33,16 +32,22 @@ import org.slf4j.LoggerFactory;
/**
* Event listener to track the current state of replication.
*/
public class ReplicationActivityStatus
implements EventHandler<Boolean>, ReplicationActivityStatusMXBean,
Closeable {
public class ReplicationActivityStatus implements
ReplicationActivityStatusMXBean, Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(ReplicationActivityStatus.class);
private AtomicBoolean replicationEnabled = new AtomicBoolean();
private AtomicBoolean replicationStatusSetExternally = new AtomicBoolean();
private ObjectName jmxObjectName;
private ReplicationStatusListener replicationStatusListener;
private ChillModeStatusListener chillModeStatusListener;
public ReplicationActivityStatus(){
replicationStatusListener = new ReplicationStatusListener();
chillModeStatusListener = new ChillModeStatusListener();
}
public boolean isReplicationEnabled() {
return replicationEnabled.get();
@ -58,13 +63,6 @@ public class ReplicationActivityStatus
replicationEnabled.set(true);
}
/**
* The replication status could be set by async events.
*/
@Override
public void onMessage(Boolean enabled, EventPublisher publisher) {
replicationEnabled.set(enabled);
}
public void start() {
try {
@ -83,4 +81,37 @@ public class ReplicationActivityStatus
MBeans.unregister(jmxObjectName);
}
}
/**
* Replication status listener.
*/
class ReplicationStatusListener implements EventHandler<Boolean> {
@Override
public void onMessage(Boolean status, EventPublisher publisher) {
replicationStatusSetExternally.set(true);
replicationEnabled.set(status);
}
}
/**
* Replication status is influenced by Chill mode status as well.
*/
class ChillModeStatusListener implements EventHandler<Boolean> {
@Override
public void onMessage(Boolean inChillMode, EventPublisher publisher) {
if (!replicationStatusSetExternally.get()) {
replicationEnabled.set(!inChillMode);
}
}
}
public ReplicationStatusListener getReplicationStatusListener() {
return replicationStatusListener;
}
public ChillModeStatusListener getChillModeStatusListener() {
return chillModeStatusListener;
}
}

View File

@ -231,6 +231,8 @@ public final class SCMEvents {
*/
public static final TypedEvent<Boolean> START_REPLICATION =
new TypedEvent<>(Boolean.class);
public static final TypedEvent<Boolean> CHILL_MODE_STATUS =
new TypedEvent<>(Boolean.class);
/**
* Private Ctor. Never Constructed.

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.server;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.server.SCMChillModeManager.ChillModeRestrictedOps;
/**
* Chill mode pre-check for SCM operations.
* */
public class ChillModePrecheck implements Precheck<ScmOps> {
private boolean inChillMode;
public static final String PRECHECK_TYPE = "ChillModePrecheck";
public boolean check(ScmOps op) throws SCMException {
if(inChillMode && ChillModeRestrictedOps.isRestrictedInChillMode(op)) {
throw new SCMException("ChillModePrecheck failed for " + op,
ResultCodes.CHILL_MODE_EXCEPTION);
}
return inChillMode;
}
@Override
public String type() {
return PRECHECK_TYPE;
}
public boolean isInChillMode() {
return inChillMode;
}
public void setInChillMode(boolean inChillMode) {
this.inChillMode = inChillMode;
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.server;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
/**
* Precheck for SCM operations.
* */
public interface Precheck<T> {
boolean check(T t) throws SCMException;
String type();
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.server;
import com.google.common.annotations.VisibleForTesting;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -26,12 +27,14 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
.NodeRegistrationContainerReport;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -57,16 +60,27 @@ public class SCMChillModeManager implements
private Map<String, ChillModeExitRule> exitRules = new HashMap(1);
private Configuration config;
private static final String CONT_EXIT_RULE = "ContainerChillModeRule";
private final EventQueue eventPublisher;
SCMChillModeManager(Configuration conf, List<ContainerInfo> allContainers,
EventPublisher eventQueue) {
EventQueue eventQueue) {
this.config = conf;
this.eventPublisher = eventQueue;
exitRules
.put(CONT_EXIT_RULE, new ContainerChillModeRule(config, allContainers));
if (!conf.getBoolean(HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED,
HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED_DEFAULT)) {
exitChillMode(eventQueue);
}
emitChillModeStatus();
}
/**
* Emit Chill mode status.
*/
@VisibleForTesting
public void emitChillModeStatus() {
eventPublisher.fireEvent(SCMEvents.CHILL_MODE_STATUS, inChillMode.get());
}
private void validateChillModeExitRules(EventPublisher eventQueue) {
@ -78,11 +92,18 @@ public class SCMChillModeManager implements
exitChillMode(eventQueue);
}
private void exitChillMode(EventPublisher eventQueue) {
/**
* Exit chill mode. It does following actions:
* 1. Set chill mode status to fale.
* 2. Emits START_REPLICATION for ReplicationManager.
* 3. Cleanup resources.
* 4. Emit chill mode status.
* @param eventQueue
*/
@VisibleForTesting
public void exitChillMode(EventPublisher eventQueue) {
LOG.info("SCM exiting chill mode.");
setInChillMode(false);
// Emit event to ReplicationManager to start replication.
eventQueue.fireEvent(SCMEvents.START_REPLICATION, true);
// TODO: Remove handler registration as there is no need to listen to
// register events anymore.
@ -90,6 +111,7 @@ public class SCMChillModeManager implements
for (ChillModeExitRule e : exitRules.values()) {
e.cleanup();
}
emitChillModeStatus();
}
@Override
@ -106,6 +128,9 @@ public class SCMChillModeManager implements
return inChillMode.get();
}
/**
* Set chill mode status.
*/
public void setInChillMode(boolean inChillMode) {
this.inChillMode.set(inChillMode);
}
@ -200,4 +225,20 @@ public class SCMChillModeManager implements
return ((ContainerChillModeRule) exitRules.get(CONT_EXIT_RULE))
.getCurrentContainerThreshold();
}
/**
* Operations restricted in SCM chill mode.
*/
public static class ChillModeRestrictedOps {
private static EnumSet restrictedOps = EnumSet.noneOf(ScmOps.class);
static {
restrictedOps.add(ScmOps.allocateBlock);
}
public static boolean isRestrictedInChillMode(ScmOps opName) {
return restrictedOps.contains(opName);
}
}
}

View File

@ -250,7 +250,10 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler);
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler);
eventQueue.addHandler(SCMEvents.START_REPLICATION, replicationStatus);
eventQueue.addHandler(SCMEvents.START_REPLICATION,
replicationStatus.getReplicationStatusListener());
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
replicationStatus.getChillModeStatusListener());
eventQueue
.addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS,
@ -258,6 +261,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
eventQueue.addHandler(SCMEvents.PIPELINE_CLOSE, pipelineCloseHandler);
eventQueue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
scmChillModeManager);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
(BlockManagerImpl) scmBlockManager);
long watcherTimeout =
conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,

View File

@ -17,21 +17,28 @@
package org.apache.hadoop.hdds.scm.block;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.Storage.StorageState;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@ -40,6 +47,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
import static org.apache.hadoop.ozone.OzoneConsts.GB;
import static org.apache.hadoop.ozone.OzoneConsts.MB;
@ -47,7 +55,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.MB;
/**
* Tests for SCM Block Manager.
*/
public class TestBlockManager {
public class TestBlockManager implements EventHandler<Boolean> {
private static ContainerMapping mapping;
private static MockNodeManager nodeManager;
private static BlockManagerImpl blockManager;
@ -56,26 +64,30 @@ public class TestBlockManager {
private static HddsProtos.ReplicationFactor factor;
private static HddsProtos.ReplicationType type;
private static String containerOwner = "OZONE";
private static EventQueue eventQueue;
@Rule
public ExpectedException thrown = ExpectedException.none();
@BeforeClass
public static void setUp() throws Exception {
@Before
public void setUp() throws Exception {
Configuration conf = SCMTestUtils.getConf();
String path = GenericTestUtils
.getTempPath(TestBlockManager.class.getSimpleName());
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, path);
testDir = Paths.get(path).toFile();
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, path);
eventQueue = new EventQueue();
boolean folderExisted = testDir.exists() || testDir.mkdirs();
if (!folderExisted) {
throw new IOException("Unable to create test directory path");
}
nodeManager = new MockNodeManager(true, 10);
mapping =
new ContainerMapping(conf, nodeManager, 128, new EventQueue());
blockManager = new BlockManagerImpl(conf, nodeManager, mapping, null);
mapping = new ContainerMapping(conf, nodeManager, 128, eventQueue);
blockManager = new BlockManagerImpl(conf,
nodeManager, mapping, eventQueue);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, blockManager);
eventQueue.addHandler(SCMEvents.START_REPLICATION, this);
if(conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT)){
factor = HddsProtos.ReplicationFactor.THREE;
@ -86,27 +98,45 @@ public class TestBlockManager {
}
}
@AfterClass
public static void cleanup() throws IOException {
@After
public void cleanup() throws IOException {
blockManager.close();
mapping.close();
FileUtil.fullyDelete(testDir);
}
@Before
public void clearChillMode() {
nodeManager.setChillmode(false);
private static StorageContainerManager getScm(OzoneConfiguration conf)
throws IOException {
conf.setBoolean(OZONE_ENABLED, true);
SCMStorage scmStore = new SCMStorage(conf);
if(scmStore.getState() != StorageState.INITIALIZED) {
String clusterId = UUID.randomUUID().toString();
String scmId = UUID.randomUUID().toString();
scmStore.setClusterId(clusterId);
scmStore.setScmId(scmId);
// writes the version file properties
scmStore.initialize();
}
return StorageContainerManager.createSCM(null, conf);
}
@Test
public void testAllocateBlock() throws Exception {
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, false);
GenericTestUtils.waitFor(() -> {
return !blockManager.isScmInChillMode();
}, 10, 1000 * 5);
AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
type, factor, containerOwner);
Assert.assertNotNull(block);
}
@Test
public void testAllocateOversizedBlock() throws IOException {
public void testAllocateOversizedBlock() throws Exception {
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, false);
GenericTestUtils.waitFor(() -> {
return !blockManager.isScmInChillMode();
}, 10, 1000 * 5);
long size = 6 * GB;
thrown.expectMessage("Unsupported block size");
AllocatedBlock block = blockManager.allocateBlock(size,
@ -115,10 +145,31 @@ public class TestBlockManager {
@Test
public void testChillModeAllocateBlockFails() throws IOException {
nodeManager.setChillmode(true);
thrown.expectMessage("Unable to create block while in chill mode");
public void testAllocateBlockFailureInChillMode() throws Exception {
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, true);
GenericTestUtils.waitFor(() -> {
return blockManager.isScmInChillMode();
}, 10, 1000 * 5);
// Test1: In chill mode expect an SCMException.
thrown.expectMessage("ChillModePrecheck failed for "
+ "allocateBlock");
blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
type, factor, containerOwner);
}
@Test
public void testAllocateBlockSucInChillMode() throws Exception {
// Test2: Exit chill mode and then try allocateBock again.
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, false);
GenericTestUtils.waitFor(() -> {
return !blockManager.isScmInChillMode();
}, 10, 1000 * 5);
Assert.assertNotNull(blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
type, factor, containerOwner));
}
@Override
public void onMessage(Boolean aBoolean, EventPublisher publisher) {
System.out.println("test");
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container.replication;
import static org.junit.Assert.*;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests for ReplicationActivityStatus.
*/
public class TestReplicationActivityStatus {
private static EventQueue eventQueue;
private static ReplicationActivityStatus replicationActivityStatus;
@BeforeClass
public static void setup() {
eventQueue = new EventQueue();
replicationActivityStatus = new ReplicationActivityStatus();
eventQueue.addHandler(SCMEvents.START_REPLICATION,
replicationActivityStatus.getReplicationStatusListener());
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
replicationActivityStatus.getChillModeStatusListener());
}
@Test
public void testReplicationStatusForChillMode()
throws TimeoutException, InterruptedException {
assertFalse(replicationActivityStatus.isReplicationEnabled());
// In chill mode replication process should be stopped.
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, true);
assertFalse(replicationActivityStatus.isReplicationEnabled());
// Replication should be enabled when chill mode if off.
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, false);
GenericTestUtils.waitFor(() -> {
return replicationActivityStatus.isReplicationEnabled();
}, 10, 1000*5);
assertTrue(replicationActivityStatus.isReplicationEnabled());
}
}