diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 4dc7e0a51c3..98efbf8dbe4 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -75,4 +75,10 @@ public final class HddsConfigKeys {
"hdds.container.close.threshold";
public static final float HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f;
+ // % of containers which should have at least one reported replica
+ // before SCM comes out of chill mode.
+ public static final String HDDS_SCM_CHILLMODE_THRESHOLD_PCT =
+ "hdds.scm.chillmode.threshold.pct";
+ public static final double HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT = 0.99;
+
}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 778d641fb1a..be19e901726 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1112,6 +1112,15 @@
+
+ hdds.scm.chillmode.threshold.pct
+ 0.99
+ HDDS,SCM,OPERATION
+ % of containers which should have at least one
+ reported replica before SCM comes out of chill mode.
+
+
+
hdds.container.action.max.limit
20
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index 35543397745..5678205a3e1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -210,6 +211,10 @@ public class ContainerMapping implements Mapping {
// For close containers create pipeline from datanodes with replicas
Set dnWithReplicas = containerStateManager
.getContainerReplicas(contInfo.containerID());
+ if (dnWithReplicas.size() == 0) {
+ throw new SCMException("Can't create a pipeline for container with "
+ + "no replica.", ResultCodes.NO_REPLICA_FOUND);
+ }
pipeline =
new Pipeline(dnWithReplicas.iterator().next().getUuidString(),
contInfo.getState(), ReplicationType.STAND_ALONE,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 03df8eb0ae6..6985834f26f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
.ReplicationCompleted;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer.NodeRegistrationContainerReport;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.TypedEvent;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@@ -60,6 +61,15 @@ public final class SCMEvents {
*/
public static final TypedEvent NODE_REPORT =
new TypedEvent<>(NodeReportFromDatanode.class, "Node_Report");
+
+ /**
+ * Event generated on DataNode registration.
+ */
+ public static final TypedEvent
+ NODE_REGISTRATION_CONT_REPORT = new TypedEvent<>(
+ NodeRegistrationContainerReport.class,
+ "Node_Registration_Container_Report");
+
/**
* ContainerReports are send out by Datanodes. This report is received by
* SCMDatanodeHeartbeatDispatcher and Container_Report Event
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java
new file mode 100644
index 00000000000..d2786372f5c
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMChillModeManager.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
+ .NodeRegistrationContainerReport;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * StorageContainerManager enters chill mode on startup to allow system to
+ * reach a stable state before becoming fully functional. SCM will wait
+ * for certain resources to be reported before coming out of chill mode.
+ *
+ * ChillModeExitRule defines format to define new rules which must be satisfied
+ * to exit Chill mode.
+ * ContainerChillModeRule defines the only exit criteria right now.
+ * On every new datanode registration event this class adds replicas
+ * for reported containers and validates if cutoff threshold for
+ * containers is meet.
+ */
+public class SCMChillModeManager implements
+ EventHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMChillModeManager.class);
+ private AtomicBoolean inChillMode = new AtomicBoolean(true);
+ private AtomicLong containerWithMinReplicas = new AtomicLong(0);
+ private Map exitRules = new HashMap(1);
+ private Configuration config;
+ private static final String CONT_EXIT_RULE = "ContainerChillModeRule";
+
+ SCMChillModeManager(Configuration conf, List allContainers) {
+ this.config = conf;
+ exitRules
+ .put(CONT_EXIT_RULE, new ContainerChillModeRule(config, allContainers));
+ }
+
+ private void validateChillModeExitRules(EventPublisher eventQueue) {
+ for (ChillModeExitRule exitRule : exitRules.values()) {
+ if (!exitRule.validate()) {
+ return;
+ }
+ }
+ exitChillMode(eventQueue);
+ }
+
+ private void exitChillMode(EventPublisher eventQueue) {
+ LOG.info("SCM exiting chill mode.");
+ setInChillMode(false);
+ // Emit event to ReplicationManager to start replication.
+ eventQueue.fireEvent(SCMEvents.START_REPLICATION, true);
+
+ // TODO: Remove handler registration as there is no need to listen to
+ // register events anymore.
+
+ for (ChillModeExitRule e : exitRules.values()) {
+ e.cleanup();
+ }
+ }
+
+ @Override
+ public void onMessage(
+ NodeRegistrationContainerReport nodeRegistrationContainerReport,
+ EventPublisher publisher) {
+ if (getInChillMode()) {
+ exitRules.get(CONT_EXIT_RULE).process(nodeRegistrationContainerReport);
+ validateChillModeExitRules(publisher);
+ }
+ }
+
+ public boolean getInChillMode() {
+ return inChillMode.get();
+ }
+
+ public void setInChillMode(boolean inChillMode) {
+ this.inChillMode.set(inChillMode);
+ }
+
+ /**
+ * Interface for defining chill mode exit rules.
+ *
+ * @param
+ */
+ public interface ChillModeExitRule {
+
+ boolean validate();
+
+ void process(T report);
+
+ void cleanup();
+ }
+
+ /**
+ * Class defining Chill mode exit criteria for Containers.
+ */
+ public class ContainerChillModeRule implements
+ ChillModeExitRule {
+
+ // Required cutoff % for containers with at least 1 reported replica.
+ private double chillModeCutoff;
+ // Containers read from scm db.
+ private Map containerMap;
+ private double maxContainer;
+
+ public ContainerChillModeRule(Configuration conf,
+ List containers) {
+ chillModeCutoff = conf
+ .getDouble(HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT,
+ HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT);
+ containerMap = new ConcurrentHashMap<>();
+ if(containers != null) {
+ containers.forEach(c -> {
+ if (c != null) {
+ containerMap.put(c.getContainerID(), c);
+ }
+ });
+ maxContainer = containers.size();
+ }
+ }
+
+ @Override
+ public boolean validate() {
+ if (maxContainer == 0) {
+ return true;
+ }
+ return getCurrentContainerThreshold() >= chillModeCutoff;
+ }
+
+ @VisibleForTesting
+ public double getCurrentContainerThreshold() {
+ return (containerWithMinReplicas.doubleValue() / maxContainer);
+ }
+
+ @Override
+ public void process(NodeRegistrationContainerReport reportsProto) {
+ if (maxContainer == 0) {
+ // No container to check.
+ return;
+ }
+
+ reportsProto.getReport().getReportsList().forEach(c -> {
+ if (containerMap.containsKey(c.getContainerID())) {
+ if(containerMap.remove(c.getContainerID()) != null) {
+ containerWithMinReplicas.getAndAdd(1);
+ }
+ }
+ });
+
+ LOG.info("SCM in chill mode. {} % containers have at least one reported "
+ + "replica.", (containerWithMinReplicas.get() / maxContainer) * 100);
+ }
+
+ @Override
+ public void cleanup() {
+ containerMap.clear();
+ }
+ }
+
+ @VisibleForTesting
+ public static Logger getLogger() {
+ return LOG;
+ }
+
+ @VisibleForTesting
+ public double getCurrentContainerThreshold() {
+ return ((ContainerChillModeRule) exitRules.get(CONT_EXIT_RULE))
+ .getCurrentContainerThreshold();
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 92158039cd8..8a09dc899d8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -73,6 +73,8 @@ import static org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.scm.HddsServerUtil;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -120,6 +122,7 @@ public class SCMDatanodeProtocolServer implements
private final StorageContainerManager scm;
private final InetSocketAddress datanodeRpcAddress;
private final SCMDatanodeHeartbeatDispatcher heartbeatDispatcher;
+ private final EventPublisher eventPublisher;
public SCMDatanodeProtocolServer(final OzoneConfiguration conf,
StorageContainerManager scm, EventPublisher eventPublisher)
@@ -129,6 +132,7 @@ public class SCMDatanodeProtocolServer implements
Preconditions.checkNotNull(eventPublisher, "EventPublisher cannot be null");
this.scm = scm;
+ this.eventPublisher = eventPublisher;
final int handlerCount =
conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY,
OZONE_SCM_HANDLER_COUNT_DEFAULT);
@@ -197,6 +201,9 @@ public class SCMDatanodeProtocolServer implements
== SCMRegisteredResponseProto.ErrorCode.success) {
scm.getScmContainerManager().processContainerReports(datanodeDetails,
containerReportsProto, true);
+ eventPublisher.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
+ new NodeRegistrationContainerReport(datanodeDetails,
+ containerReportsProto));
}
return getRegisteredResponse(registeredCommand);
}
@@ -305,4 +312,16 @@ public class SCMDatanodeProtocolServer implements
IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
}
+ /**
+ * Wrapper class for events with the datanode origin.
+ */
+ public static class NodeRegistrationContainerReport extends
+ ReportFromDatanode {
+
+ public NodeRegistrationContainerReport(DatanodeDetails datanodeDetails,
+ ContainerReportsProto report) {
+ super(datanodeDetails, report);
+ }
+ }
+
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index b84f399eec1..f505430032c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -176,6 +176,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
private final LeaseManager commandWatcherLeaseManager;
private final ReplicationActivityStatus replicationStatus;
+ private final SCMChillModeManager scmChillModeManager;
/**
* Creates a new StorageContainerManager. Configuration will be updated
@@ -231,7 +232,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
ContainerReportHandler containerReportHandler =
new ContainerReportHandler(scmContainerManager, node2ContainerMap,
replicationStatus);
-
+ scmChillModeManager = new SCMChillModeManager(conf,
+ getScmContainerManager().getStateManager().getAllContainers());
PipelineActionEventHandler pipelineActionEventHandler =
new PipelineActionEventHandler();
@@ -253,6 +255,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS,
pipelineActionEventHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_CLOSE, pipelineCloseHandler);
+ eventQueue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
+ scmChillModeManager);
long watcherTimeout =
conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
@@ -619,9 +623,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
"server", getDatanodeProtocolServer().getDatanodeRpcAddress()));
getDatanodeProtocolServer().start();
- replicationStatus.start();
httpServer.start();
scmBlockManager.start();
+ replicationStatus.start();
replicationManager.start();
setStartTime();
}
@@ -809,6 +813,15 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
return id2StatMap;
}
+ public boolean isInChillMode() {
+ return scmChillModeManager.getInChillMode();
+ }
+
+ @VisibleForTesting
+ public double getCurrentContainerThreshold() {
+ return scmChillModeManager.getCurrentContainerThreshold();
+ }
+
/**
* Startup options.
*/
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
new file mode 100644
index 00000000000..50d1eedbbe7
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/HddsTestUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
+ .NodeRegistrationContainerReport;
+
+/**
+ * Stateless helper functions for Hdds tests.
+ */
+public final class HddsTestUtils {
+
+ private HddsTestUtils() {
+ }
+
+ /**
+ * Create Command Status report object.
+ *
+ * @param numOfContainers number of containers to be included in report.
+ * @return CommandStatusReportsProto
+ */
+ public static NodeRegistrationContainerReport
+ createNodeRegistrationContainerReport(int numOfContainers) {
+ return new NodeRegistrationContainerReport(
+ TestUtils.randomDatanodeDetails(),
+ TestUtils.getRandomContainerReports(numOfContainers));
+ }
+
+ /**
+ * Create NodeRegistrationContainerReport object.
+ *
+ * @param dnContainers List of containers to be included in report
+ * @return NodeRegistrationContainerReport
+ */
+ public static NodeRegistrationContainerReport
+ createNodeRegistrationContainerReport(List dnContainers) {
+ List
+ containers = new ArrayList<>();
+ dnContainers.forEach(c -> {
+ containers.add(TestUtils.getRandomContainerInfo(c.getContainerID()));
+ });
+ return new NodeRegistrationContainerReport(
+ TestUtils.randomDatanodeDetails(),
+ TestUtils.getContainerReports(containers));
+ }
+
+ /**
+ * Creates list of ContainerInfo.
+ *
+ * @param numContainers number of ContainerInfo to be included in list.
+ * @return List
+ */
+ public static List getContainerInfo(int numContainers) {
+ List containerInfoList = new ArrayList<>();
+ for (int i = 0; i < numContainers; i++) {
+ ContainerInfo.Builder builder = new ContainerInfo.Builder();
+ containerInfoList.add(builder
+ .setContainerID(RandomUtils.nextLong())
+ .build());
+ }
+ return containerInfoList;
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMChillModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMChillModeManager.java
new file mode 100644
index 00000000000..e98a9ae1a36
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMChillModeManager.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.server;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+/** Test class for SCMChillModeManager.
+ */
+public class TestSCMChillModeManager {
+
+ private static EventQueue queue;
+ private SCMChillModeManager scmChillModeManager;
+ private static Configuration config;
+ private List containers;
+
+ @Rule
+ public Timeout timeout = new Timeout(1000 * 20);
+
+ @BeforeClass
+ public static void setUp() {
+ queue = new EventQueue();
+ config = new OzoneConfiguration();
+ }
+
+ @Test
+ public void testChillModeState() throws Exception {
+ // Test 1: test for 0 containers
+ testChillMode(0);
+
+ // Test 2: test for 20 containers
+ testChillMode(20);
+ }
+
+ @Test
+ public void testChillModeStateWithNullContainers() {
+ new SCMChillModeManager(config, null);
+ }
+
+ private void testChillMode(int numContainers) throws Exception {
+ containers = new ArrayList<>();
+ containers.addAll(HddsTestUtils.getContainerInfo(numContainers));
+ scmChillModeManager = new SCMChillModeManager(config, containers);
+ queue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
+ scmChillModeManager);
+ assertTrue(scmChillModeManager.getInChillMode());
+ queue.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
+ HddsTestUtils.createNodeRegistrationContainerReport(containers));
+ GenericTestUtils.waitFor(() -> {
+ return !scmChillModeManager.getInChillMode();
+ }, 100, 1000 * 5);
+ }
+
+ @Test
+ public void testChillModeExitRule() throws Exception {
+ containers = new ArrayList<>();
+ containers.addAll(HddsTestUtils.getContainerInfo(25 * 4));
+ scmChillModeManager = new SCMChillModeManager(config, containers);
+ queue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
+ scmChillModeManager);
+ assertTrue(scmChillModeManager.getInChillMode());
+
+ testContainerThreshold(containers.subList(0, 25), 0.25);
+ assertTrue(scmChillModeManager.getInChillMode());
+ testContainerThreshold(containers.subList(25, 50), 0.50);
+ assertTrue(scmChillModeManager.getInChillMode());
+ testContainerThreshold(containers.subList(50, 75), 0.75);
+ assertTrue(scmChillModeManager.getInChillMode());
+ testContainerThreshold(containers.subList(75, 100), 1.0);
+
+ GenericTestUtils.waitFor(() -> {
+ return !scmChillModeManager.getInChillMode();
+ }, 100, 1000 * 5);
+ }
+
+ private void testContainerThreshold(List dnContainers,
+ double expectedThreshold)
+ throws Exception {
+ queue.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
+ HddsTestUtils.createNodeRegistrationContainerReport(dnContainers));
+ GenericTestUtils.waitFor(() -> {
+ double threshold = scmChillModeManager.getCurrentContainerThreshold();
+ return threshold == expectedThreshold;
+ }, 100, 2000 * 9);
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index e11cf9bdcf1..3cba83905ee 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -178,10 +178,25 @@ public interface MiniOzoneCluster {
void shutdownHddsDatanode(DatanodeDetails dn) throws IOException;
/**
- * Shutdown the MiniOzoneCluster.
+ * Shutdown the MiniOzoneCluster and delete the storage dirs.
*/
void shutdown();
+ /**
+ * Stop the MiniOzoneCluster without any cleanup.
+ */
+ void stop();
+
+ /**
+ * Start Scm.
+ */
+ void startScm() throws IOException;
+
+ /**
+ * Start DataNodes.
+ */
+ void startHddsDatanodes();
+
/**
* Builder class for MiniOzoneCluster.
*/
@@ -209,6 +224,7 @@ public interface MiniOzoneCluster {
protected int numOfOmHandlers = 20;
protected int numOfScmHandlers = 20;
protected int numOfDatanodes = 1;
+ protected boolean startDataNodes = true;
protected Builder(OzoneConfiguration conf) {
this.conf = conf;
@@ -229,6 +245,11 @@ public interface MiniOzoneCluster {
return this;
}
+ public Builder setStartDataNodes(boolean startDataNodes) {
+ this.startDataNodes = startDataNodes;
+ return this;
+ }
+
/**
* Sets the SCM id.
*
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 7b9bb0e808d..c2169a351af 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.rest.OzoneException;
+import org.apache.hadoop.ozone.common.Storage.StorageState;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.hdds.scm.server.SCMStorage;
@@ -276,32 +277,54 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
File baseDir = new File(GenericTestUtils.getTempPath(
MiniOzoneClusterImpl.class.getSimpleName() + "-" +
scm.getClientProtocolServer().getScmInfo().getClusterId()));
+ stop();
FileUtils.deleteDirectory(baseDir);
-
- if (ozoneManager != null) {
- LOG.info("Shutting down the OzoneManager");
- ozoneManager.stop();
- ozoneManager.join();
- }
-
- if (scm != null) {
- LOG.info("Shutting down the StorageContainerManager");
- scm.stop();
- scm.join();
- }
-
- if (!hddsDatanodes.isEmpty()) {
- LOG.info("Shutting down the HddsDatanodes");
- for (HddsDatanodeService hddsDatanode : hddsDatanodes) {
- hddsDatanode.stop();
- hddsDatanode.join();
- }
- }
} catch (IOException e) {
LOG.error("Exception while shutting down the cluster.", e);
}
}
+ @Override
+ public void stop() {
+ LOG.info("Stopping the Mini Ozone Cluster");
+ if (ozoneManager != null) {
+ LOG.info("Stopping the OzoneManager");
+ ozoneManager.stop();
+ ozoneManager.join();
+ }
+
+ if (scm != null) {
+ LOG.info("Stopping the StorageContainerManager");
+ scm.stop();
+ scm.join();
+ }
+
+ if (!hddsDatanodes.isEmpty()) {
+ LOG.info("Shutting the HddsDatanodes");
+ for (HddsDatanodeService hddsDatanode : hddsDatanodes) {
+ hddsDatanode.stop();
+ hddsDatanode.join();
+ }
+ }
+ }
+
+ /**
+ * Start Scm.
+ */
+ @Override
+ public void startScm() throws IOException {
+ scm.start();
+ }
+
+ /**
+ * Start DataNodes.
+ */
+ @Override
+ public void startHddsDatanodes() {
+ hddsDatanodes.forEach((datanode) -> datanode.start(null));
+ }
+
+
/**
* Builder for configuring the MiniOzoneCluster to run.
*/
@@ -324,9 +347,13 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
scm.start();
OzoneManager om = createOM();
om.start();
- List hddsDatanodes = createHddsDatanodes(scm);
- hddsDatanodes.forEach((datanode) -> datanode.start(null));
- return new MiniOzoneClusterImpl(conf, om, scm, hddsDatanodes);
+ final List hddsDatanodes = createHddsDatanodes(scm);
+ MiniOzoneClusterImpl cluster = new MiniOzoneClusterImpl(conf, om, scm,
+ hddsDatanodes);
+ if (startDataNodes) {
+ cluster.startHddsDatanodes();
+ }
+ return cluster;
}
/**
@@ -352,13 +379,30 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
private StorageContainerManager createSCM() throws IOException {
configureSCM();
SCMStorage scmStore = new SCMStorage(conf);
+ initializeScmStorage(scmStore);
+ return StorageContainerManager.createSCM(null, conf);
+ }
+
+ private void initializeScmStorage(SCMStorage scmStore) throws IOException {
+ if (scmStore.getState() == StorageState.INITIALIZED) {
+ return;
+ }
scmStore.setClusterId(clusterId);
if (!scmId.isPresent()) {
scmId = Optional.of(UUID.randomUUID().toString());
}
scmStore.setScmId(scmId.get());
scmStore.initialize();
- return StorageContainerManager.createSCM(null, conf);
+ }
+
+ private void initializeOmStorage(OMStorage omStorage) throws IOException{
+ if (omStorage.getState() == StorageState.INITIALIZED) {
+ return;
+ }
+ omStorage.setClusterId(clusterId);
+ omStorage.setScmId(scmId.get());
+ omStorage.setOmId(omId.orElse(UUID.randomUUID().toString()));
+ omStorage.initialize();
}
/**
@@ -371,10 +415,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
private OzoneManager createOM() throws IOException {
configureOM();
OMStorage omStore = new OMStorage(conf);
- omStore.setClusterId(clusterId);
- omStore.setScmId(scmId.get());
- omStore.setOmId(omId.orElse(UUID.randomUUID().toString()));
- omStore.initialize();
+ initializeOmStorage(omStore);
return OzoneManager.createOm(null, conf);
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 8762c0e9eb2..1364d77e9f8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -17,59 +17,63 @@
*/
package org.apache.hadoop.ozone;
-import java.io.IOException;
-
-import org.apache.commons.lang3.RandomUtils;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
-import org.apache.hadoop.hdds.scm.server.SCMStorage;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption;
-import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
-import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.ScmInfo;
-import org.junit.Rule;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.Map;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import com.google.common.util.concurrent.AtomicDouble;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-
+import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
+import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
+import org.apache.hadoop.hdds.scm.container.ContainerMapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.server.SCMChillModeManager;
+import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
+import org.apache.hadoop.hdds.scm.server.SCMStorage;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
-import org.apache.hadoop.test.GenericTestUtils;
-
-import static org.apache.hadoop.hdds
- .HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.junit.Assert.fail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Test class that exercises the StorageContainerManager.
@@ -78,6 +82,8 @@ public class TestStorageContainerManager {
private static XceiverClientManager xceiverClientManager =
new XceiverClientManager(
new OzoneConfiguration());
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TestStorageContainerManager.class);
/**
* Set the timeout for every test.
*/
@@ -457,4 +463,92 @@ public class TestStorageContainerManager {
Assert.assertEquals(clusterId, scmInfo.getClusterId());
Assert.assertEquals(scmId, scmInfo.getScmId());
}
+
+ @Test
+ public void testSCMChillMode() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf)
+ .setHbInterval(1000)
+ .setNumDatanodes(3)
+ .setStartDataNodes(false)
+ .setHbProcessorInterval(500);
+ MiniOzoneClusterImpl cluster = (MiniOzoneClusterImpl) builder.build();
+ // Test1: Test chill mode when there are no containers in system.
+ assertTrue(cluster.getStorageContainerManager().isInChillMode());
+ cluster.startHddsDatanodes();
+ cluster.waitForClusterToBeReady();
+ assertFalse(cluster.getStorageContainerManager().isInChillMode());
+
+ // Test2: Test chill mode when containers are there in system.
+ // Create {numKeys} random names keys.
+ TestStorageContainerManagerHelper helper =
+ new TestStorageContainerManagerHelper(cluster, conf);
+ Map keyLocations = helper.createKeys(100*2, 4096);
+ final List containers = cluster.getStorageContainerManager()
+ .getScmContainerManager().getStateManager().getAllContainers();
+ GenericTestUtils.waitFor(() -> {
+ return containers.size() > 10;
+ }, 100, 1000);
+
+ // Removing some container to keep them open.
+ containers.remove(0);
+ containers.remove(1);
+ containers.remove(2);
+ containers.remove(3);
+
+ // Close remaining containers
+ ContainerMapping mapping = (ContainerMapping) cluster
+ .getStorageContainerManager().getScmContainerManager();
+ containers.forEach(c -> {
+ try {
+ mapping.updateContainerState(c.getContainerID(),
+ HddsProtos.LifeCycleEvent.FINALIZE);
+ mapping.updateContainerState(c.getContainerID(),
+ LifeCycleEvent.CLOSE);
+ } catch (IOException e) {
+ LOG.info("Failed to change state of open containers.", e);
+ }
+ });
+ cluster.stop();
+
+ GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+ .captureLogs(SCMChillModeManager.getLogger());
+ logCapturer.clearOutput();
+ AtomicReference miniCluster = new AtomicReference<>();
+ new Thread(() -> {
+ try {
+ miniCluster.set(builder.setStartDataNodes(false).build());
+ } catch (IOException e) {
+ fail("failed");
+ }
+ }).start();
+
+ StorageContainerManager scm;
+ GenericTestUtils.waitFor(() -> {
+ return miniCluster.get() != null;
+ }, 100, 1000 * 3);
+
+ scm = miniCluster.get().getStorageContainerManager();
+ assertTrue(scm.isInChillMode());
+ assertFalse(logCapturer.getOutput().contains("SCM exiting chill mode."));
+ assertTrue(scm.getCurrentContainerThreshold() == 0);
+ AtomicDouble curThreshold = new AtomicDouble();
+ AtomicDouble lastReportedThreshold = new AtomicDouble();
+ for(HddsDatanodeService dn:miniCluster.get().getHddsDatanodes()){
+ dn.start(null);
+ GenericTestUtils.waitFor(() -> {
+ curThreshold.set(scm.getCurrentContainerThreshold());
+ return curThreshold.get() > lastReportedThreshold.get();
+ }, 100, 1000 * 5);
+ lastReportedThreshold.set(curThreshold.get());
+ }
+ double chillModeCutoff = conf
+ .getDouble(HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT,
+ HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT);
+ assertTrue(scm.getCurrentContainerThreshold() >= chillModeCutoff);
+ assertTrue(logCapturer.getOutput().contains("SCM exiting chill mode."));
+ assertFalse(scm.isInChillMode());
+ cluster.shutdown();
+ }
+
}