HDDS-351. Add chill mode state to SCM.

Contributed by Ajay Kumar.

(cherry picked from commit ff64d35716)
This commit is contained in:
Anu Engineer 2018-09-07 10:35:45 -07:00
parent be1ec005f1
commit 48bcebc080
12 changed files with 694 additions and 78 deletions

View File

@ -75,4 +75,10 @@ public final class HddsConfigKeys {
"hdds.container.close.threshold";
public static final float HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f;
// % of containers which should have at least one reported replica
// before SCM comes out of chill mode.
public static final String HDDS_SCM_CHILLMODE_THRESHOLD_PCT =
"hdds.scm.chillmode.threshold.pct";
public static final double HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT = 0.99;
}

View File

@ -1112,6 +1112,15 @@
</description>
</property>
<property>
<name>hdds.scm.chillmode.threshold.pct</name>
<value>0.99</value>
<tag>HDDS,SCM,OPERATION</tag>
<description> % of containers which should have at least one
reported replica before SCM comes out of chill mode.
</description>
</property>
<property>
<name>hdds.container.action.max.limit</name>
<value>20</value>

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@ -210,6 +211,10 @@ public class ContainerMapping implements Mapping {
// For close containers create pipeline from datanodes with replicas
Set<DatanodeDetails> dnWithReplicas = containerStateManager
.getContainerReplicas(contInfo.containerID());
if (dnWithReplicas.size() == 0) {
throw new SCMException("Can't create a pipeline for container with "
+ "no replica.", ResultCodes.NO_REPLICA_FOUND);
}
pipeline =
new Pipeline(dnWithReplicas.iterator().next().getUuidString(),
contInfo.getState(), ReplicationType.STAND_ALONE,

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
.ReplicationCompleted;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer.NodeRegistrationContainerReport;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.TypedEvent;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@ -60,6 +61,15 @@ public final class SCMEvents {
*/
public static final TypedEvent<NodeReportFromDatanode> NODE_REPORT =
new TypedEvent<>(NodeReportFromDatanode.class, "Node_Report");
/**
* Event generated on DataNode registration.
*/
public static final TypedEvent<NodeRegistrationContainerReport>
NODE_REGISTRATION_CONT_REPORT = new TypedEvent<>(
NodeRegistrationContainerReport.class,
"Node_Registration_Container_Report");
/**
* ContainerReports are send out by Datanodes. This report is received by
* SCMDatanodeHeartbeatDispatcher and Container_Report Event

View File

@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.server;
import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
.NodeRegistrationContainerReport;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* StorageContainerManager enters chill mode on startup to allow system to
* reach a stable state before becoming fully functional. SCM will wait
* for certain resources to be reported before coming out of chill mode.
*
* ChillModeExitRule defines format to define new rules which must be satisfied
* to exit Chill mode.
* ContainerChillModeRule defines the only exit criteria right now.
* On every new datanode registration event this class adds replicas
* for reported containers and validates if cutoff threshold for
* containers is meet.
*/
public class SCMChillModeManager implements
EventHandler<NodeRegistrationContainerReport> {
private static final Logger LOG =
LoggerFactory.getLogger(SCMChillModeManager.class);
private AtomicBoolean inChillMode = new AtomicBoolean(true);
private AtomicLong containerWithMinReplicas = new AtomicLong(0);
private Map<String, ChillModeExitRule> exitRules = new HashMap(1);
private Configuration config;
private static final String CONT_EXIT_RULE = "ContainerChillModeRule";
SCMChillModeManager(Configuration conf, List<ContainerInfo> allContainers) {
this.config = conf;
exitRules
.put(CONT_EXIT_RULE, new ContainerChillModeRule(config, allContainers));
}
private void validateChillModeExitRules(EventPublisher eventQueue) {
for (ChillModeExitRule exitRule : exitRules.values()) {
if (!exitRule.validate()) {
return;
}
}
exitChillMode(eventQueue);
}
private void exitChillMode(EventPublisher eventQueue) {
LOG.info("SCM exiting chill mode.");
setInChillMode(false);
// Emit event to ReplicationManager to start replication.
eventQueue.fireEvent(SCMEvents.START_REPLICATION, true);
// TODO: Remove handler registration as there is no need to listen to
// register events anymore.
for (ChillModeExitRule e : exitRules.values()) {
e.cleanup();
}
}
@Override
public void onMessage(
NodeRegistrationContainerReport nodeRegistrationContainerReport,
EventPublisher publisher) {
if (getInChillMode()) {
exitRules.get(CONT_EXIT_RULE).process(nodeRegistrationContainerReport);
validateChillModeExitRules(publisher);
}
}
public boolean getInChillMode() {
return inChillMode.get();
}
public void setInChillMode(boolean inChillMode) {
this.inChillMode.set(inChillMode);
}
/**
* Interface for defining chill mode exit rules.
*
* @param <T>
*/
public interface ChillModeExitRule<T> {
boolean validate();
void process(T report);
void cleanup();
}
/**
* Class defining Chill mode exit criteria for Containers.
*/
public class ContainerChillModeRule implements
ChillModeExitRule<NodeRegistrationContainerReport> {
// Required cutoff % for containers with at least 1 reported replica.
private double chillModeCutoff;
// Containers read from scm db.
private Map<Long, ContainerInfo> containerMap;
private double maxContainer;
public ContainerChillModeRule(Configuration conf,
List<ContainerInfo> containers) {
chillModeCutoff = conf
.getDouble(HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT,
HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT);
containerMap = new ConcurrentHashMap<>();
if(containers != null) {
containers.forEach(c -> {
if (c != null) {
containerMap.put(c.getContainerID(), c);
}
});
maxContainer = containers.size();
}
}
@Override
public boolean validate() {
if (maxContainer == 0) {
return true;
}
return getCurrentContainerThreshold() >= chillModeCutoff;
}
@VisibleForTesting
public double getCurrentContainerThreshold() {
return (containerWithMinReplicas.doubleValue() / maxContainer);
}
@Override
public void process(NodeRegistrationContainerReport reportsProto) {
if (maxContainer == 0) {
// No container to check.
return;
}
reportsProto.getReport().getReportsList().forEach(c -> {
if (containerMap.containsKey(c.getContainerID())) {
if(containerMap.remove(c.getContainerID()) != null) {
containerWithMinReplicas.getAndAdd(1);
}
}
});
LOG.info("SCM in chill mode. {} % containers have at least one reported "
+ "replica.", (containerWithMinReplicas.get() / maxContainer) * 100);
}
@Override
public void cleanup() {
containerMap.clear();
}
}
@VisibleForTesting
public static Logger getLogger() {
return LOG;
}
@VisibleForTesting
public double getCurrentContainerThreshold() {
return ((ContainerChillModeRule) exitRules.get(CONT_EXIT_RULE))
.getCurrentContainerThreshold();
}
}

View File

@ -73,6 +73,8 @@ import static org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@ -120,6 +122,7 @@ public class SCMDatanodeProtocolServer implements
private final StorageContainerManager scm;
private final InetSocketAddress datanodeRpcAddress;
private final SCMDatanodeHeartbeatDispatcher heartbeatDispatcher;
private final EventPublisher eventPublisher;
public SCMDatanodeProtocolServer(final OzoneConfiguration conf,
StorageContainerManager scm, EventPublisher eventPublisher)
@ -129,6 +132,7 @@ public class SCMDatanodeProtocolServer implements
Preconditions.checkNotNull(eventPublisher, "EventPublisher cannot be null");
this.scm = scm;
this.eventPublisher = eventPublisher;
final int handlerCount =
conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY,
OZONE_SCM_HANDLER_COUNT_DEFAULT);
@ -197,6 +201,9 @@ public class SCMDatanodeProtocolServer implements
== SCMRegisteredResponseProto.ErrorCode.success) {
scm.getScmContainerManager().processContainerReports(datanodeDetails,
containerReportsProto, true);
eventPublisher.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
new NodeRegistrationContainerReport(datanodeDetails,
containerReportsProto));
}
return getRegisteredResponse(registeredCommand);
}
@ -305,4 +312,16 @@ public class SCMDatanodeProtocolServer implements
IOUtils.cleanupWithLogger(LOG, scm.getScmNodeManager());
}
/**
* Wrapper class for events with the datanode origin.
*/
public static class NodeRegistrationContainerReport extends
ReportFromDatanode<ContainerReportsProto> {
public NodeRegistrationContainerReport(DatanodeDetails datanodeDetails,
ContainerReportsProto report) {
super(datanodeDetails, report);
}
}
}

View File

@ -176,6 +176,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
private final LeaseManager<Long> commandWatcherLeaseManager;
private final ReplicationActivityStatus replicationStatus;
private final SCMChillModeManager scmChillModeManager;
/**
* Creates a new StorageContainerManager. Configuration will be updated
@ -231,7 +232,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
ContainerReportHandler containerReportHandler =
new ContainerReportHandler(scmContainerManager, node2ContainerMap,
replicationStatus);
scmChillModeManager = new SCMChillModeManager(conf,
getScmContainerManager().getStateManager().getAllContainers());
PipelineActionEventHandler pipelineActionEventHandler =
new PipelineActionEventHandler();
@ -253,6 +255,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS,
pipelineActionEventHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_CLOSE, pipelineCloseHandler);
eventQueue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
scmChillModeManager);
long watcherTimeout =
conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
@ -619,9 +623,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
"server", getDatanodeProtocolServer().getDatanodeRpcAddress()));
getDatanodeProtocolServer().start();
replicationStatus.start();
httpServer.start();
scmBlockManager.start();
replicationStatus.start();
replicationManager.start();
setStartTime();
}
@ -809,6 +813,15 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
return id2StatMap;
}
public boolean isInChillMode() {
return scmChillModeManager.getInChillMode();
}
@VisibleForTesting
public double getCurrentContainerThreshold() {
return scmChillModeManager.getCurrentContainerThreshold();
}
/**
* Startup options.
*/

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
.NodeRegistrationContainerReport;
/**
* Stateless helper functions for Hdds tests.
*/
public final class HddsTestUtils {
private HddsTestUtils() {
}
/**
* Create Command Status report object.
*
* @param numOfContainers number of containers to be included in report.
* @return CommandStatusReportsProto
*/
public static NodeRegistrationContainerReport
createNodeRegistrationContainerReport(int numOfContainers) {
return new NodeRegistrationContainerReport(
TestUtils.randomDatanodeDetails(),
TestUtils.getRandomContainerReports(numOfContainers));
}
/**
* Create NodeRegistrationContainerReport object.
*
* @param dnContainers List of containers to be included in report
* @return NodeRegistrationContainerReport
*/
public static NodeRegistrationContainerReport
createNodeRegistrationContainerReport(List<ContainerInfo> dnContainers) {
List<StorageContainerDatanodeProtocolProtos.ContainerInfo>
containers = new ArrayList<>();
dnContainers.forEach(c -> {
containers.add(TestUtils.getRandomContainerInfo(c.getContainerID()));
});
return new NodeRegistrationContainerReport(
TestUtils.randomDatanodeDetails(),
TestUtils.getContainerReports(containers));
}
/**
* Creates list of ContainerInfo.
*
* @param numContainers number of ContainerInfo to be included in list.
* @return List<ContainerInfo>
*/
public static List<ContainerInfo> getContainerInfo(int numContainers) {
List<ContainerInfo> containerInfoList = new ArrayList<>();
for (int i = 0; i < numContainers; i++) {
ContainerInfo.Builder builder = new ContainerInfo.Builder();
containerInfoList.add(builder
.setContainerID(RandomUtils.nextLong())
.build());
}
return containerInfoList;
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.server;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
/** Test class for SCMChillModeManager.
*/
public class TestSCMChillModeManager {
private static EventQueue queue;
private SCMChillModeManager scmChillModeManager;
private static Configuration config;
private List<ContainerInfo> containers;
@Rule
public Timeout timeout = new Timeout(1000 * 20);
@BeforeClass
public static void setUp() {
queue = new EventQueue();
config = new OzoneConfiguration();
}
@Test
public void testChillModeState() throws Exception {
// Test 1: test for 0 containers
testChillMode(0);
// Test 2: test for 20 containers
testChillMode(20);
}
@Test
public void testChillModeStateWithNullContainers() {
new SCMChillModeManager(config, null);
}
private void testChillMode(int numContainers) throws Exception {
containers = new ArrayList<>();
containers.addAll(HddsTestUtils.getContainerInfo(numContainers));
scmChillModeManager = new SCMChillModeManager(config, containers);
queue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
scmChillModeManager);
assertTrue(scmChillModeManager.getInChillMode());
queue.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
HddsTestUtils.createNodeRegistrationContainerReport(containers));
GenericTestUtils.waitFor(() -> {
return !scmChillModeManager.getInChillMode();
}, 100, 1000 * 5);
}
@Test
public void testChillModeExitRule() throws Exception {
containers = new ArrayList<>();
containers.addAll(HddsTestUtils.getContainerInfo(25 * 4));
scmChillModeManager = new SCMChillModeManager(config, containers);
queue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
scmChillModeManager);
assertTrue(scmChillModeManager.getInChillMode());
testContainerThreshold(containers.subList(0, 25), 0.25);
assertTrue(scmChillModeManager.getInChillMode());
testContainerThreshold(containers.subList(25, 50), 0.50);
assertTrue(scmChillModeManager.getInChillMode());
testContainerThreshold(containers.subList(50, 75), 0.75);
assertTrue(scmChillModeManager.getInChillMode());
testContainerThreshold(containers.subList(75, 100), 1.0);
GenericTestUtils.waitFor(() -> {
return !scmChillModeManager.getInChillMode();
}, 100, 1000 * 5);
}
private void testContainerThreshold(List<ContainerInfo> dnContainers,
double expectedThreshold)
throws Exception {
queue.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
HddsTestUtils.createNodeRegistrationContainerReport(dnContainers));
GenericTestUtils.waitFor(() -> {
double threshold = scmChillModeManager.getCurrentContainerThreshold();
return threshold == expectedThreshold;
}, 100, 2000 * 9);
}
}

View File

@ -178,10 +178,25 @@ public interface MiniOzoneCluster {
void shutdownHddsDatanode(DatanodeDetails dn) throws IOException;
/**
* Shutdown the MiniOzoneCluster.
* Shutdown the MiniOzoneCluster and delete the storage dirs.
*/
void shutdown();
/**
* Stop the MiniOzoneCluster without any cleanup.
*/
void stop();
/**
* Start Scm.
*/
void startScm() throws IOException;
/**
* Start DataNodes.
*/
void startHddsDatanodes();
/**
* Builder class for MiniOzoneCluster.
*/
@ -209,6 +224,7 @@ public interface MiniOzoneCluster {
protected int numOfOmHandlers = 20;
protected int numOfScmHandlers = 20;
protected int numOfDatanodes = 1;
protected boolean startDataNodes = true;
protected Builder(OzoneConfiguration conf) {
this.conf = conf;
@ -229,6 +245,11 @@ public interface MiniOzoneCluster {
return this;
}
public Builder setStartDataNodes(boolean startDataNodes) {
this.startDataNodes = startDataNodes;
return this;
}
/**
* Sets the SCM id.
*

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.rest.OzoneException;
import org.apache.hadoop.ozone.common.Storage.StorageState;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.hdds.scm.server.SCMStorage;
@ -276,32 +277,54 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
File baseDir = new File(GenericTestUtils.getTempPath(
MiniOzoneClusterImpl.class.getSimpleName() + "-" +
scm.getClientProtocolServer().getScmInfo().getClusterId()));
stop();
FileUtils.deleteDirectory(baseDir);
if (ozoneManager != null) {
LOG.info("Shutting down the OzoneManager");
ozoneManager.stop();
ozoneManager.join();
}
if (scm != null) {
LOG.info("Shutting down the StorageContainerManager");
scm.stop();
scm.join();
}
if (!hddsDatanodes.isEmpty()) {
LOG.info("Shutting down the HddsDatanodes");
for (HddsDatanodeService hddsDatanode : hddsDatanodes) {
hddsDatanode.stop();
hddsDatanode.join();
}
}
} catch (IOException e) {
LOG.error("Exception while shutting down the cluster.", e);
}
}
@Override
public void stop() {
LOG.info("Stopping the Mini Ozone Cluster");
if (ozoneManager != null) {
LOG.info("Stopping the OzoneManager");
ozoneManager.stop();
ozoneManager.join();
}
if (scm != null) {
LOG.info("Stopping the StorageContainerManager");
scm.stop();
scm.join();
}
if (!hddsDatanodes.isEmpty()) {
LOG.info("Shutting the HddsDatanodes");
for (HddsDatanodeService hddsDatanode : hddsDatanodes) {
hddsDatanode.stop();
hddsDatanode.join();
}
}
}
/**
* Start Scm.
*/
@Override
public void startScm() throws IOException {
scm.start();
}
/**
* Start DataNodes.
*/
@Override
public void startHddsDatanodes() {
hddsDatanodes.forEach((datanode) -> datanode.start(null));
}
/**
* Builder for configuring the MiniOzoneCluster to run.
*/
@ -324,9 +347,13 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
scm.start();
OzoneManager om = createOM();
om.start();
List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
hddsDatanodes.forEach((datanode) -> datanode.start(null));
return new MiniOzoneClusterImpl(conf, om, scm, hddsDatanodes);
final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
MiniOzoneClusterImpl cluster = new MiniOzoneClusterImpl(conf, om, scm,
hddsDatanodes);
if (startDataNodes) {
cluster.startHddsDatanodes();
}
return cluster;
}
/**
@ -352,13 +379,30 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
private StorageContainerManager createSCM() throws IOException {
configureSCM();
SCMStorage scmStore = new SCMStorage(conf);
initializeScmStorage(scmStore);
return StorageContainerManager.createSCM(null, conf);
}
private void initializeScmStorage(SCMStorage scmStore) throws IOException {
if (scmStore.getState() == StorageState.INITIALIZED) {
return;
}
scmStore.setClusterId(clusterId);
if (!scmId.isPresent()) {
scmId = Optional.of(UUID.randomUUID().toString());
}
scmStore.setScmId(scmId.get());
scmStore.initialize();
return StorageContainerManager.createSCM(null, conf);
}
private void initializeOmStorage(OMStorage omStorage) throws IOException{
if (omStorage.getState() == StorageState.INITIALIZED) {
return;
}
omStorage.setClusterId(clusterId);
omStorage.setScmId(scmId.get());
omStorage.setOmId(omId.orElse(UUID.randomUUID().toString()));
omStorage.initialize();
}
/**
@ -371,10 +415,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
private OzoneManager createOM() throws IOException {
configureOM();
OMStorage omStore = new OMStorage(conf);
omStore.setClusterId(clusterId);
omStore.setScmId(scmId.get());
omStore.setOmId(omId.orElse(UUID.randomUUID().toString()));
omStore.initialize();
initializeOmStorage(omStore);
return OzoneManager.createOm(null, conf);
}

View File

@ -17,59 +17,63 @@
*/
package org.apache.hadoop.ozone;
import java.io.IOException;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.junit.Rule;
import org.junit.Assert;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.HashSet;
import java.util.Set;
import java.util.Map;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import com.google.common.util.concurrent.AtomicDouble;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLog;
import org.apache.hadoop.hdds.scm.block.SCMBlockDeletingService;
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.server.SCMChillModeManager;
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
import org.apache.hadoop.hdds.scm.server.SCMStorage;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager.StartupOption;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.hdds
.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
import static org.junit.Assert.fail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test class that exercises the StorageContainerManager.
@ -78,6 +82,8 @@ public class TestStorageContainerManager {
private static XceiverClientManager xceiverClientManager =
new XceiverClientManager(
new OzoneConfiguration());
private static final Logger LOG = LoggerFactory
.getLogger(TestStorageContainerManager.class);
/**
* Set the timeout for every test.
*/
@ -457,4 +463,92 @@ public class TestStorageContainerManager {
Assert.assertEquals(clusterId, scmInfo.getClusterId());
Assert.assertEquals(scmId, scmInfo.getScmId());
}
@Test
public void testSCMChillMode() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
MiniOzoneCluster.Builder builder = MiniOzoneCluster.newBuilder(conf)
.setHbInterval(1000)
.setNumDatanodes(3)
.setStartDataNodes(false)
.setHbProcessorInterval(500);
MiniOzoneClusterImpl cluster = (MiniOzoneClusterImpl) builder.build();
// Test1: Test chill mode when there are no containers in system.
assertTrue(cluster.getStorageContainerManager().isInChillMode());
cluster.startHddsDatanodes();
cluster.waitForClusterToBeReady();
assertFalse(cluster.getStorageContainerManager().isInChillMode());
// Test2: Test chill mode when containers are there in system.
// Create {numKeys} random names keys.
TestStorageContainerManagerHelper helper =
new TestStorageContainerManagerHelper(cluster, conf);
Map<String, OmKeyInfo> keyLocations = helper.createKeys(100*2, 4096);
final List<ContainerInfo> containers = cluster.getStorageContainerManager()
.getScmContainerManager().getStateManager().getAllContainers();
GenericTestUtils.waitFor(() -> {
return containers.size() > 10;
}, 100, 1000);
// Removing some container to keep them open.
containers.remove(0);
containers.remove(1);
containers.remove(2);
containers.remove(3);
// Close remaining containers
ContainerMapping mapping = (ContainerMapping) cluster
.getStorageContainerManager().getScmContainerManager();
containers.forEach(c -> {
try {
mapping.updateContainerState(c.getContainerID(),
HddsProtos.LifeCycleEvent.FINALIZE);
mapping.updateContainerState(c.getContainerID(),
LifeCycleEvent.CLOSE);
} catch (IOException e) {
LOG.info("Failed to change state of open containers.", e);
}
});
cluster.stop();
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(SCMChillModeManager.getLogger());
logCapturer.clearOutput();
AtomicReference<MiniOzoneCluster> miniCluster = new AtomicReference<>();
new Thread(() -> {
try {
miniCluster.set(builder.setStartDataNodes(false).build());
} catch (IOException e) {
fail("failed");
}
}).start();
StorageContainerManager scm;
GenericTestUtils.waitFor(() -> {
return miniCluster.get() != null;
}, 100, 1000 * 3);
scm = miniCluster.get().getStorageContainerManager();
assertTrue(scm.isInChillMode());
assertFalse(logCapturer.getOutput().contains("SCM exiting chill mode."));
assertTrue(scm.getCurrentContainerThreshold() == 0);
AtomicDouble curThreshold = new AtomicDouble();
AtomicDouble lastReportedThreshold = new AtomicDouble();
for(HddsDatanodeService dn:miniCluster.get().getHddsDatanodes()){
dn.start(null);
GenericTestUtils.waitFor(() -> {
curThreshold.set(scm.getCurrentContainerThreshold());
return curThreshold.get() > lastReportedThreshold.get();
}, 100, 1000 * 5);
lastReportedThreshold.set(curThreshold.get());
}
double chillModeCutoff = conf
.getDouble(HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT,
HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT);
assertTrue(scm.getCurrentContainerThreshold() >= chillModeCutoff);
assertTrue(logCapturer.getOutput().contains("SCM exiting chill mode."));
assertFalse(scm.isInChillMode());
cluster.shutdown();
}
}