HDDS-1196. Add a ReplicationStartTimer class. (#567)

This commit is contained in:
Bharat Viswanadham 2019-03-08 07:25:33 -08:00 committed by Nanda kumar
parent e0260417ad
commit 341c076f05
17 changed files with 361 additions and 79 deletions

View File

@ -17,6 +17,9 @@
package org.apache.hadoop.hdds; package org.apache.hadoop.hdds;
import org.apache.hadoop.utils.db.DBProfile; import org.apache.hadoop.utils.db.DBProfile;
import org.apache.ratis.util.TimeDuration;
import java.util.concurrent.TimeUnit;
/** /**
* This class contains constants for configuration keys and default values * This class contains constants for configuration keys and default values
@ -70,6 +73,14 @@ public final class HddsConfigKeys {
"hdds.scm.chillmode.min.datanode"; "hdds.scm.chillmode.min.datanode";
public static final int HDDS_SCM_CHILLMODE_MIN_DATANODE_DEFAULT = 1; public static final int HDDS_SCM_CHILLMODE_MIN_DATANODE_DEFAULT = 1;
public static final String
HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT =
"hdds.scm.wait.time.after.chillmode.exit";
public static final String
HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT_DEFAULT = "5m";
public static final String HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK = public static final String HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK =
"hdds.scm.chillmode.pipeline-availability.check"; "hdds.scm.chillmode.pipeline-availability.check";
public static final boolean public static final boolean

View File

@ -1289,6 +1289,16 @@
</description> </description>
</property> </property>
<property>
<name>hdds.scm.wait.time.after.chillmode.exit</name>
<value>5m</value>
<tag>HDDS,SCM,OPERATION</tag>
<description> After exiting chillmode, wait for configured interval of
time to start replication monitor and cleanup activities of unhealthy
pipelines.
</description>
</property>
<property> <property>
<name>hdds.scm.chillmode.enabled</name> <name>hdds.scm.chillmode.enabled</name>
<value>true</value> <value>true</value>

View File

@ -79,4 +79,11 @@ public interface BlockManager extends Closeable {
* @return the block deleting service executed in SCM. * @return the block deleting service executed in SCM.
*/ */
SCMBlockDeletingService getSCMBlockDeletingService(); SCMBlockDeletingService getSCMBlockDeletingService();
/**
* Set ChillMode status.
*
* @param chillModeStatus
*/
void setChillModeStatus(boolean chillModeStatus);
} }

View File

@ -66,8 +66,7 @@ import java.util.function.Predicate;
/** Block Manager manages the block access for SCM. */ /** Block Manager manages the block access for SCM. */
public class BlockManagerImpl implements EventHandler<Boolean>, public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
BlockManager, BlockmanagerMXBean {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(BlockManagerImpl.class); LoggerFactory.getLogger(BlockManagerImpl.class);
// TODO : FIX ME : Hard coding the owner. // TODO : FIX ME : Hard coding the owner.
@ -337,8 +336,8 @@ public class BlockManagerImpl implements EventHandler<Boolean>,
} }
@Override @Override
public void onMessage(Boolean inChillMode, EventPublisher publisher) { public void setChillModeStatus(boolean chillModeStatus) {
this.chillModePrecheck.setInChillMode(inChillMode); this.chillModePrecheck.setInChillMode(chillModeStatus);
} }
/** /**

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* <p>http://www.apache.org/licenses/LICENSE-2.0
* <p>
* <p>Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.chillmode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.container.replication.
ReplicationActivityStatus;
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager.ChillModeStatus;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Class to handle the activities needed to be performed after exiting chill
* mode.
*/
public class ChillModeHandler implements EventHandler<ChillModeStatus> {
private final SCMClientProtocolServer scmClientProtocolServer;
private final BlockManager scmBlockManager;
private final long waitTime;
private final AtomicBoolean isInChillMode = new AtomicBoolean(true);
private final ReplicationActivityStatus replicationActivityStatus;
/**
* ChillModeHandler, to handle the logic once we exit chill mode.
* @param configuration
* @param clientProtocolServer
* @param blockManager
* @param replicationStatus
*/
public ChillModeHandler(Configuration configuration,
SCMClientProtocolServer clientProtocolServer,
BlockManager blockManager,
ReplicationActivityStatus replicationStatus) {
Objects.requireNonNull(configuration, "Configuration cannot be null");
Objects.requireNonNull(clientProtocolServer, "SCMClientProtocolServer " +
"object cannot be null");
Objects.requireNonNull(blockManager, "BlockManager object cannot be null");
Objects.requireNonNull(replicationStatus, "ReplicationActivityStatus " +
"object cannot be null");
this.waitTime = configuration.getTimeDuration(
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT,
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT_DEFAULT,
TimeUnit.MILLISECONDS);
scmClientProtocolServer = clientProtocolServer;
scmBlockManager = blockManager;
replicationActivityStatus = replicationStatus;
boolean chillModeEnabled = configuration.getBoolean(
HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED,
HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED_DEFAULT);
isInChillMode.set(chillModeEnabled);
}
/**
* Set ChillMode status based on
* {@link org.apache.hadoop.hdds.scm.events.SCMEvents#CHILL_MODE_STATUS}.
*
* Inform BlockManager, ScmClientProtocolServer and replicationAcitivity
* status about chillMode status.
*
* @param chillModeStatus
* @param publisher
*/
@Override
public void onMessage(ChillModeStatus chillModeStatus,
EventPublisher publisher) {
isInChillMode.set(chillModeStatus.getChillModeStatus());
replicationActivityStatus.fireReplicationStart(isInChillMode.get(),
waitTime);
scmClientProtocolServer.setChillModeStatus(isInChillMode.get());
scmBlockManager.setChillModeStatus(isInChillMode.get());
}
public boolean getChillModeStatus() {
return isInChillMode.get();
}
}

View File

@ -73,6 +73,7 @@ public class SCMChillModeManager {
this.isChillModeEnabled = conf.getBoolean( this.isChillModeEnabled = conf.getBoolean(
HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED, HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED,
HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED_DEFAULT); HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED_DEFAULT);
if (isChillModeEnabled) { if (isChillModeEnabled) {
ContainerChillModeRule containerChillModeRule = ContainerChillModeRule containerChillModeRule =
new ContainerChillModeRule(config, allContainers, this); new ContainerChillModeRule(config, allContainers, this);
@ -111,7 +112,8 @@ public class SCMChillModeManager {
*/ */
@VisibleForTesting @VisibleForTesting
public void emitChillModeStatus() { public void emitChillModeStatus() {
eventPublisher.fireEvent(SCMEvents.CHILL_MODE_STATUS, getInChillMode()); eventPublisher.fireEvent(SCMEvents.CHILL_MODE_STATUS,
new ChillModeStatus(getInChillMode()));
} }
public void validateChillModeExitRules(EventPublisher eventQueue) { public void validateChillModeExitRules(EventPublisher eventQueue) {
@ -185,4 +187,20 @@ public class SCMChillModeManager {
exitRules.get(ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE); exitRules.get(ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE);
} }
/**
* Class used during ChillMode status event.
*/
public static class ChillModeStatus {
private boolean chillModeStatus;
public ChillModeStatus(boolean chillModeState) {
this.chillModeStatus = chillModeState;
}
public boolean getChillModeStatus() {
return chillModeStatus;
}
}
} }

View File

@ -20,11 +20,13 @@ package org.apache.hadoop.hdds.scm.container.replication;
import javax.management.ObjectName; import javax.management.ObjectName;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.metrics2.util.MBeans;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -39,15 +41,8 @@ public class ReplicationActivityStatus implements
LoggerFactory.getLogger(ReplicationActivityStatus.class); LoggerFactory.getLogger(ReplicationActivityStatus.class);
private AtomicBoolean replicationEnabled = new AtomicBoolean(); private AtomicBoolean replicationEnabled = new AtomicBoolean();
private AtomicBoolean replicationStatusSetExternally = new AtomicBoolean();
private ObjectName jmxObjectName; private ObjectName jmxObjectName;
private ReplicationStatusListener replicationStatusListener;
private ChillModeStatusListener chillModeStatusListener;
public ReplicationActivityStatus(){
replicationStatusListener = new ReplicationStatusListener();
chillModeStatusListener = new ChillModeStatusListener();
}
@Override @Override
public boolean isReplicationEnabled() { public boolean isReplicationEnabled() {
return replicationEnabled.get(); return replicationEnabled.get();
@ -84,35 +79,26 @@ public class ReplicationActivityStatus implements
} }
/** /**
* Replication status listener. * Waits for
* {@link HddsConfigKeys#HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT} and set
* replicationEnabled to start replication monitor thread.
*/ */
class ReplicationStatusListener implements EventHandler<Boolean> { public void fireReplicationStart(boolean chillModeStatus,
@Override long waitTime) {
public void onMessage(Boolean status, EventPublisher publisher) { if (!chillModeStatus) {
replicationStatusSetExternally.set(true); CompletableFuture.runAsync(() -> {
replicationEnabled.set(status); try {
Thread.sleep(waitTime);
} catch (InterruptedException ex) {
LOG.error("Interrupted during wait, replication event is not fired",
ex);
}
setReplicationEnabled(true);
LOG.info("Replication Timer sleep for {} ms completed. Enable " +
"Replication", waitTime);
});
} }
} }
/**
* Replication status is influenced by Chill mode status as well.
*/
class ChillModeStatusListener implements EventHandler<Boolean> {
@Override
public void onMessage(Boolean inChillMode, EventPublisher publisher) {
if (!replicationStatusSetExternally.get()) {
replicationEnabled.set(!inChillMode);
}
}
}
public ReplicationStatusListener getReplicationStatusListener() {
return replicationStatusListener;
}
public ChillModeStatusListener getChillModeStatusListener() {
return chillModeStatusListener;
}
} }

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.events;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager.ChillModeStatus;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
.ReplicationStatus; .ReplicationStatus;
@ -253,8 +254,8 @@ public final class SCMEvents {
*/ */
public static final TypedEvent<Boolean> START_REPLICATION = public static final TypedEvent<Boolean> START_REPLICATION =
new TypedEvent<>(Boolean.class); new TypedEvent<>(Boolean.class);
public static final TypedEvent<Boolean> CHILL_MODE_STATUS = public static final TypedEvent<ChillModeStatus> CHILL_MODE_STATUS =
new TypedEvent<>(Boolean.class); new TypedEvent<>(ChillModeStatus.class);
/** /**
* Private Ctor. Never Constructed. * Private Ctor. Never Constructed.

View File

@ -49,8 +49,6 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils; import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
@ -95,7 +93,7 @@ import static org.apache.hadoop.hdds.scm.server.StorageContainerManager
* The RPC server that listens to requests from clients. * The RPC server that listens to requests from clients.
*/ */
public class SCMClientProtocolServer implements public class SCMClientProtocolServer implements
StorageContainerLocationProtocol, EventHandler<Boolean>, Auditor { StorageContainerLocationProtocol, Auditor {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(SCMClientProtocolServer.class); LoggerFactory.getLogger(SCMClientProtocolServer.class);
private static final AuditLogger AUDIT = private static final AuditLogger AUDIT =
@ -496,14 +494,6 @@ public class SCMClientProtocolServer implements
return scm; return scm;
} }
/**
* Set chill mode status based on SCMEvents.CHILL_MODE_STATUS event.
*/
@Override
public void onMessage(Boolean inChillMode, EventPublisher publisher) {
chillModePrecheck.setInChillMode(inChillMode);
}
/** /**
* Set chill mode status based on . * Set chill mode status based on .
*/ */
@ -561,4 +551,13 @@ public class SCMClientProtocolServer implements
public void close() throws IOException { public void close() throws IOException {
stop(); stop();
} }
/**
* Set ChillMode status.
*
* @param chillModeStatus
*/
public void setChillModeStatus(boolean chillModeStatus) {
chillModePrecheck.setInChillMode(chillModeStatus);
}
} }

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl; import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler; import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler;
import org.apache.hadoop.hdds.scm.chillmode.ChillModeHandler;
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager; import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
@ -202,6 +203,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
private JvmPauseMonitor jvmPauseMonitor; private JvmPauseMonitor jvmPauseMonitor;
private final OzoneConfiguration configuration; private final OzoneConfiguration configuration;
private final ChillModeHandler chillModeHandler;
/** /**
* Creates a new StorageContainerManager. Configuration will be * Creates a new StorageContainerManager. Configuration will be
@ -336,6 +338,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
clientProtocolServer = new SCMClientProtocolServer(conf, this); clientProtocolServer = new SCMClientProtocolServer(conf, this);
httpServer = new StorageContainerManagerHttpServer(conf); httpServer = new StorageContainerManagerHttpServer(conf);
chillModeHandler = new ChillModeHandler(configuration,
clientProtocolServer, scmBlockManager, replicationStatus);
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler); eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
@ -350,20 +355,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
nonHealthyToHealthyNodeHandler); nonHealthyToHealthyNodeHandler);
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler); eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler); eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler);
eventQueue.addHandler(SCMEvents.START_REPLICATION,
replicationStatus.getReplicationStatusListener());
eventQueue eventQueue
.addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler); .addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler);
eventQueue.addHandler(SCMEvents.DELETE_BLOCK_STATUS, eventQueue.addHandler(SCMEvents.DELETE_BLOCK_STATUS,
(DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog()); (DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog());
eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler); eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, clientProtocolServer);
eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler); eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, chillModeHandler);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
replicationStatus.getChillModeStatusListener());
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
(BlockManagerImpl) scmBlockManager);
registerMXBean(); registerMXBean();
} }
@ -1079,6 +1077,11 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
return scmBlockManager; return scmBlockManager;
} }
@VisibleForTesting
public ChillModeHandler getChillModeHandler() {
return chillModeHandler;
}
public void checkAdminAccess(String remoteUser) throws IOException { public void checkAdminAccess(String remoteUser) throws IOException {
if (remoteUser != null) { if (remoteUser != null) {
if (!scmAdminUsernames.contains(remoteUser)) { if (!scmAdminUsernames.contains(remoteUser)) {

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager.ChillModeStatus;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@ -71,6 +72,7 @@ public class TestBlockManager implements EventHandler<Boolean> {
private static EventQueue eventQueue; private static EventQueue eventQueue;
private int numContainerPerOwnerInPipeline; private int numContainerPerOwnerInPipeline;
private OzoneConfiguration conf; private OzoneConfiguration conf;
private ChillModeStatus chillModeStatus = new ChillModeStatus(false);
@Rule @Rule
public ExpectedException thrown = ExpectedException.none(); public ExpectedException thrown = ExpectedException.none();
@ -101,7 +103,7 @@ public class TestBlockManager implements EventHandler<Boolean> {
eventQueue = new EventQueue(); eventQueue = new EventQueue();
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
(BlockManagerImpl) scm.getScmBlockManager()); scm.getChillModeHandler());
eventQueue.addHandler(SCMEvents.START_REPLICATION, this); eventQueue.addHandler(SCMEvents.START_REPLICATION, this);
CloseContainerEventHandler closeContainerHandler = CloseContainerEventHandler closeContainerHandler =
new CloseContainerEventHandler(pipelineManager, mapping); new CloseContainerEventHandler(pipelineManager, mapping);
@ -123,7 +125,7 @@ public class TestBlockManager implements EventHandler<Boolean> {
@Test @Test
public void testAllocateBlock() throws Exception { public void testAllocateBlock() throws Exception {
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, false); eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, chillModeStatus);
GenericTestUtils.waitFor(() -> { GenericTestUtils.waitFor(() -> {
return !blockManager.isScmInChillMode(); return !blockManager.isScmInChillMode();
}, 10, 1000 * 5); }, 10, 1000 * 5);
@ -134,7 +136,7 @@ public class TestBlockManager implements EventHandler<Boolean> {
@Test @Test
public void testAllocateOversizedBlock() throws Exception { public void testAllocateOversizedBlock() throws Exception {
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, false); eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, chillModeStatus);
GenericTestUtils.waitFor(() -> { GenericTestUtils.waitFor(() -> {
return !blockManager.isScmInChillMode(); return !blockManager.isScmInChillMode();
}, 10, 1000 * 5); }, 10, 1000 * 5);
@ -147,7 +149,8 @@ public class TestBlockManager implements EventHandler<Boolean> {
@Test @Test
public void testAllocateBlockFailureInChillMode() throws Exception { public void testAllocateBlockFailureInChillMode() throws Exception {
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, true); eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS,
new ChillModeStatus(true));
GenericTestUtils.waitFor(() -> { GenericTestUtils.waitFor(() -> {
return blockManager.isScmInChillMode(); return blockManager.isScmInChillMode();
}, 10, 1000 * 5); }, 10, 1000 * 5);
@ -161,7 +164,7 @@ public class TestBlockManager implements EventHandler<Boolean> {
@Test @Test
public void testAllocateBlockSucInChillMode() throws Exception { public void testAllocateBlockSucInChillMode() throws Exception {
// Test2: Exit chill mode and then try allocateBock again. // Test2: Exit chill mode and then try allocateBock again.
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, false); eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, chillModeStatus);
GenericTestUtils.waitFor(() -> { GenericTestUtils.waitFor(() -> {
return !blockManager.isScmInChillMode(); return !blockManager.isScmInChillMode();
}, 10, 1000 * 5); }, 10, 1000 * 5);
@ -172,7 +175,7 @@ public class TestBlockManager implements EventHandler<Boolean> {
@Test(timeout = 10000) @Test(timeout = 10000)
public void testMultipleBlockAllocation() public void testMultipleBlockAllocation()
throws IOException, TimeoutException, InterruptedException { throws IOException, TimeoutException, InterruptedException {
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, false); eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, chillModeStatus);
GenericTestUtils GenericTestUtils
.waitFor(() -> !blockManager.isScmInChillMode(), 10, 1000 * 5); .waitFor(() -> !blockManager.isScmInChillMode(), 10, 1000 * 5);
@ -214,7 +217,7 @@ public class TestBlockManager implements EventHandler<Boolean> {
@Test(timeout = 10000) @Test(timeout = 10000)
public void testMultipleBlockAllocationWithClosedContainer() public void testMultipleBlockAllocationWithClosedContainer()
throws IOException, TimeoutException, InterruptedException { throws IOException, TimeoutException, InterruptedException {
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, false); eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, chillModeStatus);
GenericTestUtils GenericTestUtils
.waitFor(() -> !blockManager.isScmInChillMode(), 10, 1000 * 5); .waitFor(() -> !blockManager.isScmInChillMode(), 10, 1000 * 5);
@ -266,7 +269,7 @@ public class TestBlockManager implements EventHandler<Boolean> {
@Test(timeout = 10000) @Test(timeout = 10000)
public void testBlockAllocationWithNoAvailablePipelines() public void testBlockAllocationWithNoAvailablePipelines()
throws IOException, TimeoutException, InterruptedException { throws IOException, TimeoutException, InterruptedException {
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, false); eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, chillModeStatus);
GenericTestUtils GenericTestUtils
.waitFor(() -> !blockManager.isScmInChillMode(), 10, 1000 * 5); .waitFor(() -> !blockManager.isScmInChillMode(), 10, 1000 * 5);

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.chillmode;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
/**
* Tests ChillModeHandler behavior.
*/
public class TestChillModeHandler {
private OzoneConfiguration configuration;
private SCMClientProtocolServer scmClientProtocolServer;
private ReplicationActivityStatus replicationActivityStatus;
private BlockManager blockManager;
private ChillModeHandler chillModeHandler;
private EventQueue eventQueue;
private SCMChillModeManager.ChillModeStatus chillModeStatus;
public void setup(boolean enabled) {
configuration = new OzoneConfiguration();
configuration.setBoolean(HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED,
enabled);
configuration.set(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT,
"3s");
scmClientProtocolServer =
Mockito.mock(SCMClientProtocolServer.class);
replicationActivityStatus =
new ReplicationActivityStatus();
blockManager = Mockito.mock(BlockManagerImpl.class);
chillModeHandler =
new ChillModeHandler(configuration, scmClientProtocolServer,
blockManager, replicationActivityStatus);
eventQueue = new EventQueue();
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, chillModeHandler);
chillModeStatus = new SCMChillModeManager.ChillModeStatus(false);
}
@Test
public void testChillModeHandlerWithChillModeEnabled() throws Exception {
setup(true);
Assert.assertTrue(chillModeHandler.getChillModeStatus());
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, chillModeStatus);
GenericTestUtils.waitFor(() -> !chillModeHandler.getChillModeStatus(),
1000, 5000);
Assert.assertFalse(scmClientProtocolServer.getChillModeStatus());
Assert.assertFalse(((BlockManagerImpl) blockManager).isScmInChillMode());
GenericTestUtils.waitFor(() ->
replicationActivityStatus.isReplicationEnabled(), 1000, 5000);
}
@Test
public void testChillModeHandlerWithChillModeDisbaled() throws Exception{
setup(false);
Assert.assertFalse(chillModeHandler.getChillModeStatus());
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, chillModeStatus);
Assert.assertFalse(chillModeHandler.getChillModeStatus());
Assert.assertFalse(scmClientProtocolServer.getChillModeStatus());
Assert.assertFalse(((BlockManagerImpl) blockManager).isScmInChillMode());
GenericTestUtils.waitFor(() ->
replicationActivityStatus.isReplicationEnabled(), 1000, 5000);
}
}

View File

@ -21,11 +21,20 @@ package org.apache.hadoop.hdds.scm.container.replication;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
import org.apache.hadoop.hdds.scm.chillmode.ChillModeHandler;
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
/** /**
* Tests for ReplicationActivityStatus. * Tests for ReplicationActivityStatus.
@ -39,10 +48,19 @@ public class TestReplicationActivityStatus {
public static void setup() { public static void setup() {
eventQueue = new EventQueue(); eventQueue = new EventQueue();
replicationActivityStatus = new ReplicationActivityStatus(); replicationActivityStatus = new ReplicationActivityStatus();
eventQueue.addHandler(SCMEvents.START_REPLICATION,
replicationActivityStatus.getReplicationStatusListener()); OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, ozoneConfiguration.set(HddsConfigKeys.
replicationActivityStatus.getChillModeStatusListener()); HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT, "3s");
SCMClientProtocolServer scmClientProtocolServer =
Mockito.mock(SCMClientProtocolServer.class);
BlockManager blockManager = Mockito.mock(BlockManagerImpl.class);
ChillModeHandler chillModeHandler =
new ChillModeHandler(ozoneConfiguration, scmClientProtocolServer,
blockManager, replicationActivityStatus);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, chillModeHandler);
} }
@Test @Test
@ -50,11 +68,13 @@ public class TestReplicationActivityStatus {
throws TimeoutException, InterruptedException { throws TimeoutException, InterruptedException {
assertFalse(replicationActivityStatus.isReplicationEnabled()); assertFalse(replicationActivityStatus.isReplicationEnabled());
// In chill mode replication process should be stopped. // In chill mode replication process should be stopped.
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, true); eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS,
new SCMChillModeManager.ChillModeStatus(true));
assertFalse(replicationActivityStatus.isReplicationEnabled()); assertFalse(replicationActivityStatus.isReplicationEnabled());
// Replication should be enabled when chill mode if off. // Replication should be enabled when chill mode if off.
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, false); eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS,
new SCMChillModeManager.ChillModeStatus(false));
GenericTestUtils.waitFor(() -> { GenericTestUtils.waitFor(() -> {
return replicationActivityStatus.isReplicationEnabled(); return replicationActivityStatus.isReplicationEnabled();
}, 10, 1000*5); }, 10, 1000*5);

View File

@ -21,6 +21,10 @@ package org.apache.hadoop.hdds.scm.server;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
import org.apache.hadoop.hdds.scm.chillmode.ChillModeHandler;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus;
import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.hdds.server.events.EventQueue;
@ -28,6 +32,7 @@ import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
/** /**
* Test class for @{@link SCMClientProtocolServer}. * Test class for @{@link SCMClientProtocolServer}.
@ -42,7 +47,12 @@ public class TestSCMClientProtocolServer {
config = new OzoneConfiguration(); config = new OzoneConfiguration();
eventQueue = new EventQueue(); eventQueue = new EventQueue();
scmClientProtocolServer = new SCMClientProtocolServer(config, null); scmClientProtocolServer = new SCMClientProtocolServer(config, null);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, scmClientProtocolServer); BlockManager blockManager = Mockito.mock(BlockManagerImpl.class);
ReplicationActivityStatus replicationActivityStatus =
Mockito.mock(ReplicationActivityStatus.class);
ChillModeHandler chillModeHandler = new ChillModeHandler(config,
scmClientProtocolServer, blockManager, replicationActivityStatus);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, chillModeHandler);
} }
@After @After

View File

@ -57,6 +57,7 @@ import org.junit.Test;
import org.junit.rules.Timeout; import org.junit.rules.Timeout;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import sun.rmi.runtime.Log;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -335,7 +336,8 @@ public class TestScmChillMode {
assertFalse((scm.getClientProtocolServer()).getChillModeStatus()); assertFalse((scm.getClientProtocolServer()).getChillModeStatus());
final List<ContainerInfo> containers = scm.getContainerManager() final List<ContainerInfo> containers = scm.getContainerManager()
.getContainers(); .getContainers();
scm.getEventQueue().fireEvent(SCMEvents.CHILL_MODE_STATUS, true); scm.getEventQueue().fireEvent(SCMEvents.CHILL_MODE_STATUS,
new SCMChillModeManager.ChillModeStatus(true));
GenericTestUtils.waitFor(() -> { GenericTestUtils.waitFor(() -> {
return clientProtocolServer.getChillModeStatus(); return clientProtocolServer.getChillModeStatus();
}, 50, 1000 * 5); }, 50, 1000 * 5);

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@ -93,7 +94,8 @@ public class BenchMarkOzoneManager {
.getPipelines(ReplicationType.RATIS, ReplicationFactor.THREE)) { .getPipelines(ReplicationType.RATIS, ReplicationFactor.THREE)) {
pipelineManager.openPipeline(pipeline.getId()); pipelineManager.openPipeline(pipeline.getId());
} }
scm.getEventQueue().fireEvent(SCMEvents.CHILL_MODE_STATUS, false); scm.getEventQueue().fireEvent(SCMEvents.CHILL_MODE_STATUS,
new SCMChillModeManager.ChillModeStatus(false));
Thread.sleep(1000); Thread.sleep(1000);
// prepare OM // prepare OM

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@ -84,7 +85,8 @@ public class BenchMarkSCM {
.getPipelines(ReplicationType.RATIS, ReplicationFactor.THREE)) { .getPipelines(ReplicationType.RATIS, ReplicationFactor.THREE)) {
pipelineManager.openPipeline(pipeline.getId()); pipelineManager.openPipeline(pipeline.getId());
} }
scm.getEventQueue().fireEvent(SCMEvents.CHILL_MODE_STATUS, false); scm.getEventQueue().fireEvent(SCMEvents.CHILL_MODE_STATUS,
new SCMChillModeManager.ChillModeStatus(false));
Thread.sleep(1000); Thread.sleep(1000);
} }
} finally { } finally {