HDDS-351. Add chill mode state to SCM.
Contributed by Ajay Kumar.
This commit is contained in:
parent
36c7c78260
commit
ff64d35716
|
@ -75,4 +75,10 @@ public final class HddsConfigKeys {
|
|||
"hdds.container.close.threshold";
|
||||
public static final float HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f;
|
||||
|
||||
// % of containers which should have at least one reported replica
|
||||
// before SCM comes out of chill mode.
|
||||
public static final String HDDS_SCM_CHILLMODE_THRESHOLD_PCT =
|
||||
"hdds.scm.chillmode.threshold.pct";
|
||||
public static final double HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT = 0.99;
|
||||
|
||||
}
|
||||
|
|
|
@ -1112,6 +1112,15 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hdds.scm.chillmode.threshold.pct</name>
|
||||
<value>0.99</value>
|
||||
<tag>HDDS,SCM,OPERATION</tag>
|
||||
<description> % of containers which should have at least one
|
||||
reported replica before SCM comes out of chill mode.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>hdds.container.action.max.limit</name>
|
||||
<value>20</value>
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
|||
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
|
||||
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
||||
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
|
@ -210,6 +211,10 @@ public class ContainerMapping implements Mapping {
|
|||
// For close containers create pipeline from datanodes with replicas
|
||||
Set<DatanodeDetails> dnWithReplicas = containerStateManager
|
||||
.getContainerReplicas(contInfo.containerID());
|
||||
if (dnWithReplicas.size() == 0) {
|
||||
throw new SCMException("Can't create a pipeline for container with "
|
||||
+ "no replica.", ResultCodes.NO_REPLICA_FOUND);
|
||||
}
|
||||
pipeline =
|
||||
new Pipeline(dnWithReplicas.iterator().next().getUuidString(),
|
||||
contInfo.getState(), ReplicationType.STAND_ALONE,
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
|
|||
.ReplicationCompleted;
|
||||
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
|
||||
|
||||
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer.NodeRegistrationContainerReport;
|
||||
import org.apache.hadoop.hdds.server.events.Event;
|
||||
import org.apache.hadoop.hdds.server.events.TypedEvent;
|
||||
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
|
||||
|
@ -60,6 +61,15 @@ public final class SCMEvents {
|
|||
*/
|
||||
public static final TypedEvent<NodeReportFromDatanode> NODE_REPORT =
|
||||
new TypedEvent<>(NodeReportFromDatanode.class, "Node_Report");
|
||||
|
||||
/**
|
||||
* Event generated on DataNode registration.
|
||||
*/
|
||||
public static final TypedEvent<NodeRegistrationContainerReport>
|
||||
NODE_REGISTRATION_CONT_REPORT = new TypedEvent<>(
|
||||
NodeRegistrationContainerReport.class,
|
||||
"Node_Registration_Container_Report");
|
||||
|
||||
/**
|
||||
* ContainerReports are send out by Datanodes. This report is received by
|
||||
* SCMDatanodeHeartbeatDispatcher and Container_Report Event
|
||||
|
|
|
@ -0,0 +1,198 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.scm.server;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
|
||||
.NodeRegistrationContainerReport;
|
||||
import org.apache.hadoop.hdds.server.events.EventHandler;
|
||||
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* StorageContainerManager enters chill mode on startup to allow system to
|
||||
* reach a stable state before becoming fully functional. SCM will wait
|
||||
* for certain resources to be reported before coming out of chill mode.
|
||||
*
|
||||
* ChillModeExitRule defines format to define new rules which must be satisfied
|
||||
* to exit Chill mode.
|
||||
* ContainerChillModeRule defines the only exit criteria right now.
|
||||
* On every new datanode registration event this class adds replicas
|
||||
* for reported containers and validates if cutoff threshold for
|
||||
* containers is meet.
|
||||
*/
|
||||
public class SCMChillModeManager implements
|
||||
EventHandler<NodeRegistrationContainerReport> {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(SCMChillModeManager.class);
|
||||
private AtomicBoolean inChillMode = new AtomicBoolean(true);
|
||||
private AtomicLong containerWithMinReplicas = new AtomicLong(0);
|
||||
private Map<String, ChillModeExitRule> exitRules = new HashMap(1);
|
||||
private Configuration config;
|
||||
private static final String CONT_EXIT_RULE = "ContainerChillModeRule";
|
||||
|
||||
SCMChillModeManager(Configuration conf, List<ContainerInfo> allContainers) {
|
||||
this.config = conf;
|
||||
exitRules
|
||||
.put(CONT_EXIT_RULE, new ContainerChillModeRule(config, allContainers));
|
||||
}
|
||||
|
||||
private void validateChillModeExitRules(EventPublisher eventQueue) {
|
||||
for (ChillModeExitRule exitRule : exitRules.values()) {
|
||||
if (!exitRule.validate()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
exitChillMode(eventQueue);
|
||||
}
|
||||
|
||||
private void exitChillMode(EventPublisher eventQueue) {
|
||||
LOG.info("SCM exiting chill mode.");
|
||||
setInChillMode(false);
|
||||
// Emit event to ReplicationManager to start replication.
|
||||
eventQueue.fireEvent(SCMEvents.START_REPLICATION, true);
|
||||
|
||||
// TODO: Remove handler registration as there is no need to listen to
|
||||
// register events anymore.
|
||||
|
||||
for (ChillModeExitRule e : exitRules.values()) {
|
||||
e.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(
|
||||
NodeRegistrationContainerReport nodeRegistrationContainerReport,
|
||||
EventPublisher publisher) {
|
||||
if (getInChillMode()) {
|
||||
exitRules.get(CONT_EXIT_RULE).process(nodeRegistrationContainerReport);
|
||||
validateChillModeExitRules(publisher);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean getInChillMode() {
|
||||
return inChillMode.get();
|
||||
}
|
||||
|
||||
public void setInChillMode(boolean inChillMode) {
|
||||
this.inChillMode.set(inChillMode);
|
||||
}
|
||||
|
||||
/**
|
||||
* Interface for defining chill mode exit rules.
|
||||
*
|
||||
* @param <T>
|
||||
*/
|
||||
public interface ChillModeExitRule<T> {
|
||||
|
||||
boolean validate();
|
||||
|
||||
void process(T report);
|
||||
|
||||
void cleanup();
|
||||
}
|
||||
|
||||
/**
|
||||
* Class defining Chill mode exit criteria for Containers.
|
||||
*/
|
||||
public class ContainerChillModeRule implements
|
||||
ChillModeExitRule<NodeRegistrationContainerReport> {
|
||||
|
||||
// Required cutoff % for containers with at least 1 reported replica.
|
||||
private double chillModeCutoff;
|
||||
// Containers read from scm db.
|
||||
private Map<Long, ContainerInfo> containerMap;
|
||||
private double maxContainer;
|
||||
|
||||
public ContainerChillModeRule(Configuration conf,
|
||||
List<ContainerInfo> containers) {
|
||||
chillModeCutoff = conf
|
||||
.getDouble(HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT,
|
||||
HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT);
|
||||
containerMap = new ConcurrentHashMap<>();
|
||||
if(containers != null) {
|
||||
containers.forEach(c -> {
|
||||
if (c != null) {
|
||||
containerMap.put(c.getContainerID(), c);
|
||||
}
|
||||
});
|
||||
maxContainer = containers.size();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean validate() {
|
||||
if (maxContainer == 0) {
|
||||
return true;
|
||||
}
|
||||
return getCurrentContainerThreshold() >= chillModeCutoff;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getCurrentContainerThreshold() {
|
||||
return (containerWithMinReplicas.doubleValue() / maxContainer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(NodeRegistrationContainerReport reportsProto) {
|
||||
if (maxContainer == 0) {
|
||||
// No container to check.
|
||||
return;
|
||||
}
|
||||
|
||||
reportsProto.getReport().getReportsList().forEach(c -> {
|
||||
if (containerMap.containsKey(c.getContainerID())) {
|
||||
if(containerMap.remove(c.getContainerID()) != null) {
|
||||
containerWithMinReplicas.getAndAdd(1);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
LOG.info("SCM in chill mode. {} % containers have at least one reported "
|
||||
+ "replica.", (containerWithMinReplicas.get() / maxContainer) * 100);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup() {
|
||||
containerMap.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static Logger getLogger() {
|
||||
return LOG;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getCurrentContainerThreshold() {
|
||||
return ((ContainerChillModeRule) exitRules.get(CONT_EXIT_RULE))
|
||||
.getCurrentContainerThreshold();
|
||||
}
|
||||
}
|
|
@ -73,6 +73,8 @@ import static org.apache.hadoop.hdds.protocol.proto
|
|||
|
||||
|
||||
import org.apache.hadoop.hdds.scm.HddsServerUtil;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ReportFromDatanode;
|
||||
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
|
@ -120,6 +122,7 @@ public class SCMDatanodeProtocolServer implements
|
|||
private final StorageContainerManager scm;
|
||||
private final InetSocketAddress datanodeRpcAddress;
|
||||
private final SCMDatanodeHeartbeatDispatcher heartbeatDispatcher;
|
||||
private final EventPublisher eventPublisher;
|
||||
|
||||
public SCMDatanodeProtocolServer(final OzoneConfiguration conf,
|
||||
StorageContainerManager scm, EventPublisher eventPublisher)
|
||||
|
@ -129,6 +132,7 @@ public class SCMDatanodeProtocolServer implements
|
|||
Preconditions.checkNotNull(eventPublisher, "EventPublisher cannot be null");
|
||||
|
||||
this.scm = scm;
|
||||
this.eventPublisher = eventPublisher;
|
||||
final int handlerCount =
|
||||
conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY,
|
||||
OZONE_SCM_HANDLER_COUNT_DEFAULT);
|
||||
|
@ -197,6 +201,9 @@ public class SCMDatanodeProtocolServer implements
|
|||
== SCMRegisteredResponseProto.ErrorCode.success) {
|
||||
scm.getScmContainerManager().processContainerReports(datanodeDetails,
|
||||
containerReportsProto, true);
|
||||
eventPublisher.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
|
||||
new NodeRegistrationContainerReport(datanodeDetails,
|
||||
containerReportsProto));
|
||||
}
|
||||
return getRegisteredResponse(registeredCommand);
|
||||
}
|
||||
|
@ -305,4 +312,16 @@ public class SCMDatanodeProtocolServer implements
|
|||
IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper class for events with the datanode origin.
|
||||
*/
|
||||
public static class NodeRegistrationContainerReport extends
|
||||
ReportFromDatanode<ContainerReportsProto> {
|
||||
|
||||
public NodeRegistrationContainerReport(DatanodeDetails datanodeDetails,
|
||||
ContainerReportsProto report) {
|
||||
super(datanodeDetails, report);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -176,6 +176,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
private final LeaseManager<Long> commandWatcherLeaseManager;
|
||||
|
||||
private final ReplicationActivityStatus replicationStatus;
|
||||
private final SCMChillModeManager scmChillModeManager;
|
||||
|
||||
/**
|
||||
* Creates a new StorageContainerManager. Configuration will be updated
|
||||
|
@ -231,7 +232,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
ContainerReportHandler containerReportHandler =
|
||||
new ContainerReportHandler(scmContainerManager, node2ContainerMap,
|
||||
replicationStatus);
|
||||
|
||||
scmChillModeManager = new SCMChillModeManager(conf,
|
||||
getScmContainerManager().getStateManager().getAllContainers());
|
||||
PipelineActionEventHandler pipelineActionEventHandler =
|
||||
new PipelineActionEventHandler();
|
||||
|
||||
|
@ -253,6 +255,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS,
|
||||
pipelineActionEventHandler);
|
||||
eventQueue.addHandler(SCMEvents.PIPELINE_CLOSE, pipelineCloseHandler);
|
||||
eventQueue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
|
||||
scmChillModeManager);
|
||||
|
||||
long watcherTimeout =
|
||||
conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
|
||||
|
@ -619,9 +623,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
"server", getDatanodeProtocolServer().getDatanodeRpcAddress()));
|
||||
getDatanodeProtocolServer().start();
|
||||
|
||||
replicationStatus.start();
|
||||
httpServer.start();
|
||||
scmBlockManager.start();
|
||||
replicationStatus.start();
|
||||
replicationManager.start();
|
||||
setStartTime();
|
||||
}
|
||||
|
@ -809,6 +813,15 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
return id2StatMap;
|
||||
}
|
||||
|
||||
public boolean isInChillMode() {
|
||||
return scmChillModeManager.getInChillMode();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getCurrentContainerThreshold() {
|
||||
return scmChillModeManager.getCurrentContainerThreshold();
|
||||
}
|
||||
|
||||
/**
|
||||
* Startup options.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.scm;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.commons.lang3.RandomUtils;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
|
||||
.NodeRegistrationContainerReport;
|
||||
|
||||
/**
|
||||
* Stateless helper functions for Hdds tests.
|
||||
*/
|
||||
public final class HddsTestUtils {
|
||||
|
||||
private HddsTestUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Create Command Status report object.
|
||||
*
|
||||
* @param numOfContainers number of containers to be included in report.
|
||||
* @return CommandStatusReportsProto
|
||||
*/
|
||||
public static NodeRegistrationContainerReport
|
||||
createNodeRegistrationContainerReport(int numOfContainers) {
|
||||
return new NodeRegistrationContainerReport(
|
||||
TestUtils.randomDatanodeDetails(),
|
||||
TestUtils.getRandomContainerReports(numOfContainers));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create NodeRegistrationContainerReport object.
|
||||
*
|
||||
* @param dnContainers List of containers to be included in report
|
||||
* @return NodeRegistrationContainerReport
|
||||
*/
|
||||
public static NodeRegistrationContainerReport
|
||||
createNodeRegistrationContainerReport(List<ContainerInfo> dnContainers) {
|
||||
List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
|
||||
containers = new ArrayList<>();
|
||||
dnContainers.forEach(c -> {
|
||||
containers.add(TestUtils.getRandomContainerInfo(c.getContainerID()));
|
||||
});
|
||||
return new NodeRegistrationContainerReport(
|
||||
TestUtils.randomDatanodeDetails(),
|
||||
TestUtils.getContainerReports(containers));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates list of ContainerInfo.
|
||||
*
|
||||
* @param numContainers number of ContainerInfo to be included in list.
|
||||
* @return List<ContainerInfo>
|
||||
*/
|
||||
public static List<ContainerInfo> getContainerInfo(int numContainers) {
|
||||
List<ContainerInfo> containerInfoList = new ArrayList<>();
|
||||
for (int i = 0; i < numContainers; i++) {
|
||||
ContainerInfo.Builder builder = new ContainerInfo.Builder();
|
||||
containerInfoList.add(builder
|
||||
.setContainerID(RandomUtils.nextLong())
|
||||
.build());
|
||||
}
|
||||
return containerInfoList;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,115 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.scm.server;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.scm.HddsTestUtils;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.events.SCMEvents;
|
||||
import org.apache.hadoop.hdds.server.events.EventQueue;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
/** Test class for SCMChillModeManager.
|
||||
*/
|
||||
public class TestSCMChillModeManager {
|
||||
|
||||
private static EventQueue queue;
|
||||
private SCMChillModeManager scmChillModeManager;
|
||||
private static Configuration config;
|
||||
private List<ContainerInfo> containers;
|
||||
|
||||
@Rule
|
||||
public Timeout timeout = new Timeout(1000 * 20);
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() {
|
||||
queue = new EventQueue();
|
||||
config = new OzoneConfiguration();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChillModeState() throws Exception {
|
||||
// Test 1: test for 0 containers
|
||||
testChillMode(0);
|
||||
|
||||
// Test 2: test for 20 containers
|
||||
testChillMode(20);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChillModeStateWithNullContainers() {
|
||||
new SCMChillModeManager(config, null);
|
||||
}
|
||||
|
||||
private void testChillMode(int numContainers) throws Exception {
|
||||
containers = new ArrayList<>();
|
||||
containers.addAll(HddsTestUtils.getContainerInfo(numContainers));
|
||||
scmChillModeManager = new SCMChillModeManager(config, containers);
|
||||
queue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
|
||||
scmChillModeManager);
|
||||
assertTrue(scmChillModeManager.getInChillMode());
|
||||
queue.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
|
||||
HddsTestUtils.createNodeRegistrationContainerReport(containers));
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
return !scmChillModeManager.getInChillMode();
|
||||
}, 100, 1000 * 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChillModeExitRule() throws Exception {
|
||||
containers = new ArrayList<>();
|
||||
containers.addAll(HddsTestUtils.getContainerInfo(25 * 4));
|
||||
scmChillModeManager = new SCMChillModeManager(config, containers);
|
||||
queue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
|
||||
scmChillModeManager);
|
||||
assertTrue(scmChillModeManager.getInChillMode());
|
||||
|
||||
testContainerThreshold(containers.subList(0, 25), 0.25);
|
||||
assertTrue(scmChillModeManager.getInChillMode());
|
||||
testContainerThreshold(containers.subList(25, 50), 0.50);
|
||||
assertTrue(scmChillModeManager.getInChillMode());
|
||||
testContainerThreshold(containers.subList(50, 75), 0.75);
|
||||
assertTrue(scmChillModeManager.getInChillMode());
|
||||
testContainerThreshold(containers.subList(75, 100), 1.0);
|
||||
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
return !scmChillModeManager.getInChillMode();
|
||||
}, 100, 1000 * 5);
|
||||
}
|
||||
|
||||
private void testContainerThreshold(List<ContainerInfo> dnContainers,
|
||||
double expectedThreshold)
|
||||
throws Exception {
|
||||
queue.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
|
||||
HddsTestUtils.createNodeRegistrationContainerReport(dnContainers));
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
double threshold = scmChillModeManager.getCurrentContainerThreshold();
|
||||
return threshold == expectedThreshold;
|
||||
}, 100, 2000 * 9);
|
||||
}
|
||||
|
||||
}
|
|
@ -178,10 +178,25 @@ public interface MiniOzoneCluster {
|
|||
void shutdownHddsDatanode(DatanodeDetails dn) throws IOException;
|
||||
|
||||
/**
|
||||
* Shutdown the MiniOzoneCluster.
|
||||
* Shutdown the MiniOzoneCluster and delete the storage dirs.
|
||||
*/
|
||||
void shutdown();
|
||||
|
||||
/**
|
||||
* Stop the MiniOzoneCluster without any cleanup.
|
||||
*/
|
||||
void stop();
|
||||
|
||||
/**
|
||||
* Start Scm.
|
||||
*/
|
||||
void startScm() throws IOException;
|
||||
|
||||
/**
|
||||
* Start DataNodes.
|
||||
*/
|
||||
void startHddsDatanodes();
|
||||
|
||||
/**
|
||||
* Builder class for MiniOzoneCluster.
|
||||
*/
|
||||
|
@ -209,6 +224,7 @@ public interface MiniOzoneCluster {
|
|||
protected int numOfOmHandlers = 20;
|
||||
protected int numOfScmHandlers = 20;
|
||||
protected int numOfDatanodes = 1;
|
||||
protected boolean startDataNodes = true;
|
||||
|
||||
protected Builder(OzoneConfiguration conf) {
|
||||
this.conf = conf;
|
||||
|
@ -229,6 +245,11 @@ public interface MiniOzoneCluster {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setStartDataNodes(boolean startDataNodes) {
|
||||
this.startDataNodes = startDataNodes;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the SCM id.
|
||||
*
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.net.NetUtils;
|
|||
import org.apache.hadoop.ozone.client.OzoneClient;
|
||||
import org.apache.hadoop.ozone.client.OzoneClientFactory;
|
||||
import org.apache.hadoop.ozone.client.rest.OzoneException;
|
||||
import org.apache.hadoop.ozone.common.Storage.StorageState;
|
||||
import org.apache.hadoop.ozone.om.OMConfigKeys;
|
||||
import org.apache.hadoop.ozone.om.OzoneManager;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMStorage;
|
||||
|
@ -276,32 +277,54 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
|
|||
File baseDir = new File(GenericTestUtils.getTempPath(
|
||||
MiniOzoneClusterImpl.class.getSimpleName() + "-" +
|
||||
scm.getClientProtocolServer().getScmInfo().getClusterId()));
|
||||
stop();
|
||||
FileUtils.deleteDirectory(baseDir);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Exception while shutting down the cluster.", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
LOG.info("Stopping the Mini Ozone Cluster");
|
||||
if (ozoneManager != null) {
|
||||
LOG.info("Shutting down the OzoneManager");
|
||||
LOG.info("Stopping the OzoneManager");
|
||||
ozoneManager.stop();
|
||||
ozoneManager.join();
|
||||
}
|
||||
|
||||
if (scm != null) {
|
||||
LOG.info("Shutting down the StorageContainerManager");
|
||||
LOG.info("Stopping the StorageContainerManager");
|
||||
scm.stop();
|
||||
scm.join();
|
||||
}
|
||||
|
||||
if (!hddsDatanodes.isEmpty()) {
|
||||
LOG.info("Shutting down the HddsDatanodes");
|
||||
LOG.info("Shutting the HddsDatanodes");
|
||||
for (HddsDatanodeService hddsDatanode : hddsDatanodes) {
|
||||
hddsDatanode.stop();
|
||||
hddsDatanode.join();
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Exception while shutting down the cluster.", e);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start Scm.
|
||||
*/
|
||||
@Override
|
||||
public void startScm() throws IOException {
|
||||
scm.start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Start DataNodes.
|
||||
*/
|
||||
@Override
|
||||
public void startHddsDatanodes() {
|
||||
hddsDatanodes.forEach((datanode) -> datanode.start(null));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Builder for configuring the MiniOzoneCluster to run.
|
||||
*/
|
||||
|
@ -324,9 +347,13 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
|
|||
scm.start();
|
||||
OzoneManager om = createOM();
|
||||
om.start();
|
||||
List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
|
||||
hddsDatanodes.forEach((datanode) -> datanode.start(null));
|
||||
return new MiniOzoneClusterImpl(conf, om, scm, hddsDatanodes);
|
||||
final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
|
||||
MiniOzoneClusterImpl cluster = new MiniOzoneClusterImpl(conf, om, scm,
|
||||
hddsDatanodes);
|
||||
if (startDataNodes) {
|
||||
cluster.startHddsDatanodes();
|
||||
}
|
||||
return cluster;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -352,13 +379,30 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
|
|||
private StorageContainerManager createSCM() throws IOException {
|
||||
configureSCM();
|
||||
SCMStorage scmStore = new SCMStorage(conf);
|
||||
initializeScmStorage(scmStore);
|
||||
return StorageContainerManager.createSCM(null, conf);
|
||||
}
|
||||
|
||||
private void initializeScmStorage(SCMStorage scmStore) throws IOException {
|
||||
if (scmStore.getState() == StorageState.INITIALIZED) {
|
||||
return;
|
||||
}
|
||||
scmStore.setClusterId(clusterId);
|
||||
if (!scmId.isPresent()) {
|
||||
scmId = Optional.of(UUID.randomUUID().toString());
|
||||
}
|
||||
scmStore.setScmId(scmId.get());
|
||||
scmStore.initialize();
|
||||
return StorageContainerManager.createSCM(null, conf);
|
||||
}
|
||||
|
||||
private void initializeOmStorage(OMStorage omStorage) throws IOException{
|
||||
if (omStorage.getState() == StorageState.INITIALIZED) {
|
||||
return;
|
||||
}
|
||||
omStorage.setClusterId(clusterId);
|
||||
omStorage.setScmId(scmId.get());
|
||||
omStorage.setOmId(omId.orElse(UUID.randomUUID().toString()));
|
||||
omStorage.initialize();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -371,10 +415,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
|
|||
private OzoneManager createOM() throws IOException {
|
||||
configureOM();
|
||||
OMStorage omStore = new OMStorage(conf);
|
||||
omStore.setClusterId(clusterId);
|
||||
omStore.setScmId(scmId.get());
|
||||
omStore.setOmId(omId.orElse(UUID.randomUUID().toString()));
|
||||
omStore.initialize();
|
||||
initializeOmStorage(omStore);
|
||||
return OzoneManager.createOm(null, conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -17,59 +17,63 @@
|
|||
*/
|
||||
package org.apache.hadoop.ozone;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.lang3.RandomUtils;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMStorage;
|
||||
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
||||
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
|
||||
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
|
||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption;
|
||||
import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
|
||||
import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.XceiverClientManager;
|
||||
import org.apache.hadoop.hdds.scm.ScmInfo;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.List;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.Map;
|
||||
import java.util.Collections;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
||||
import com.google.common.util.concurrent.AtomicDouble;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.apache.commons.lang3.RandomUtils;
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
|
||||
import org.apache.hadoop.hdds.scm.ScmInfo;
|
||||
import org.apache.hadoop.hdds.scm.XceiverClientManager;
|
||||
import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
|
||||
import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
|
||||
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMChillModeManager;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMStorage;
|
||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||
import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption;
|
||||
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
||||
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
|
||||
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.junit.rules.Timeout;
|
||||
import org.mockito.Mockito;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
||||
import static org.apache.hadoop.hdds
|
||||
.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
|
||||
import static org.junit.Assert.fail;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Test class that exercises the StorageContainerManager.
|
||||
|
@ -78,6 +82,8 @@ public class TestStorageContainerManager {
|
|||
private static XceiverClientManager xceiverClientManager =
|
||||
new XceiverClientManager(
|
||||
new OzoneConfiguration());
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(TestStorageContainerManager.class);
|
||||
/**
|
||||
* Set the timeout for every test.
|
||||
*/
|
||||
|
@ -457,4 +463,92 @@ public class TestStorageContainerManager {
|
|||
Assert.assertEquals(clusterId, scmInfo.getClusterId());
|
||||
Assert.assertEquals(scmId, scmInfo.getScmId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSCMChillMode() throws Exception {
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf)
|
||||
.setHbInterval(1000)
|
||||
.setNumDatanodes(3)
|
||||
.setStartDataNodes(false)
|
||||
.setHbProcessorInterval(500);
|
||||
MiniOzoneClusterImpl cluster = (MiniOzoneClusterImpl) builder.build();
|
||||
// Test1: Test chill mode when there are no containers in system.
|
||||
assertTrue(cluster.getStorageContainerManager().isInChillMode());
|
||||
cluster.startHddsDatanodes();
|
||||
cluster.waitForClusterToBeReady();
|
||||
assertFalse(cluster.getStorageContainerManager().isInChillMode());
|
||||
|
||||
// Test2: Test chill mode when containers are there in system.
|
||||
// Create {numKeys} random names keys.
|
||||
TestStorageContainerManagerHelper helper =
|
||||
new TestStorageContainerManagerHelper(cluster, conf);
|
||||
Map<String, OmKeyInfo> keyLocations = helper.createKeys(100*2, 4096);
|
||||
final List<ContainerInfo> containers = cluster.getStorageContainerManager()
|
||||
.getScmContainerManager().getStateManager().getAllContainers();
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
return containers.size() > 10;
|
||||
}, 100, 1000);
|
||||
|
||||
// Removing some container to keep them open.
|
||||
containers.remove(0);
|
||||
containers.remove(1);
|
||||
containers.remove(2);
|
||||
containers.remove(3);
|
||||
|
||||
// Close remaining containers
|
||||
ContainerMapping mapping = (ContainerMapping) cluster
|
||||
.getStorageContainerManager().getScmContainerManager();
|
||||
containers.forEach(c -> {
|
||||
try {
|
||||
mapping.updateContainerState(c.getContainerID(),
|
||||
HddsProtos.LifeCycleEvent.FINALIZE);
|
||||
mapping.updateContainerState(c.getContainerID(),
|
||||
LifeCycleEvent.CLOSE);
|
||||
} catch (IOException e) {
|
||||
LOG.info("Failed to change state of open containers.", e);
|
||||
}
|
||||
});
|
||||
cluster.stop();
|
||||
|
||||
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
|
||||
.captureLogs(SCMChillModeManager.getLogger());
|
||||
logCapturer.clearOutput();
|
||||
AtomicReference<MiniOzoneCluster> miniCluster = new AtomicReference<>();
|
||||
new Thread(() -> {
|
||||
try {
|
||||
miniCluster.set(builder.setStartDataNodes(false).build());
|
||||
} catch (IOException e) {
|
||||
fail("failed");
|
||||
}
|
||||
}).start();
|
||||
|
||||
StorageContainerManager scm;
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
return miniCluster.get() != null;
|
||||
}, 100, 1000 * 3);
|
||||
|
||||
scm = miniCluster.get().getStorageContainerManager();
|
||||
assertTrue(scm.isInChillMode());
|
||||
assertFalse(logCapturer.getOutput().contains("SCM exiting chill mode."));
|
||||
assertTrue(scm.getCurrentContainerThreshold() == 0);
|
||||
AtomicDouble curThreshold = new AtomicDouble();
|
||||
AtomicDouble lastReportedThreshold = new AtomicDouble();
|
||||
for(HddsDatanodeService dn:miniCluster.get().getHddsDatanodes()){
|
||||
dn.start(null);
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
curThreshold.set(scm.getCurrentContainerThreshold());
|
||||
return curThreshold.get() > lastReportedThreshold.get();
|
||||
}, 100, 1000 * 5);
|
||||
lastReportedThreshold.set(curThreshold.get());
|
||||
}
|
||||
double chillModeCutoff = conf
|
||||
.getDouble(HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT,
|
||||
HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT);
|
||||
assertTrue(scm.getCurrentContainerThreshold() >= chillModeCutoff);
|
||||
assertTrue(logCapturer.getOutput().contains("SCM exiting chill mode."));
|
||||
assertFalse(scm.isInChillMode());
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue