HDDS-1217. Refactor ChillMode rules and chillmode manager. (#558)

This commit is contained in:
Bharat Viswanadham 2019-03-25 13:11:03 -07:00 committed by Nanda kumar
parent 3c45762a0b
commit 8739693514
7 changed files with 498 additions and 201 deletions

View File

@ -17,16 +17,94 @@
*/ */
package org.apache.hadoop.hdds.scm.chillmode; package org.apache.hadoop.hdds.scm.chillmode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.server.events.TypedEvent;
/** /**
* Interface for defining chill mode exit rules. * Abstract class for ChillModeExitRules. When a new rule is added, the new
* rule should extend this abstract class.
*
* Each rule Should do:
* 1. Should add a handler for the event it is looking for during the
* initialization of the rule.
* 2. Add the rule in ScmChillModeManager to list of the rules.
*
* *
* @param <T> * @param <T>
*/ */
public interface ChillModeExitRule<T> { public abstract class ChillModeExitRule<T> implements EventHandler<T> {
boolean validate(); private final SCMChillModeManager chillModeManager;
private final String ruleName;
void process(T report); public ChillModeExitRule(SCMChillModeManager chillModeManager,
String ruleName, EventQueue eventQueue) {
this.chillModeManager = chillModeManager;
this.ruleName = ruleName;
eventQueue.addHandler(getEventType(), this);
}
/**
* Return's the name of this ChillModeExit Rule.
* @return ruleName
*/
public String getRuleName() {
return ruleName;
}
/**
* Return's the event type this chillMode exit rule handles.
* @return TypedEvent
*/
protected abstract TypedEvent<T> getEventType();
/**
* Validate's this rule. If this rule condition is met, returns true, else
* returns false.
* @return boolean
*/
protected abstract boolean validate();
/**
* Actual processing logic for this rule.
* @param report
*/
protected abstract void process(T report);
/**
* Cleanup action's need to be done, once this rule is satisfied.
*/
protected abstract void cleanup();
@Override
public final void onMessage(T report, EventPublisher publisher) {
// TODO: when we have remove handlers, we can remove getInChillmode check
if (scmInChillMode()) {
if (validate()) {
chillModeManager.validateChillModeExitRules(ruleName, publisher);
cleanup();
return;
}
process(report);
if (validate()) {
chillModeManager.validateChillModeExitRules(ruleName, publisher);
cleanup();
}
}
}
/**
* Return true if SCM is in chill mode, else false.
* @return boolean
*/
protected boolean scmInChillMode() {
return chillModeManager.getInChillMode();
}
void cleanup();
} }

View File

@ -22,22 +22,24 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer.NodeRegistrationContainerReport; import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
.NodeRegistrationContainerReport;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.TypedEvent;
/** /**
* Class defining Chill mode exit criteria for Containers. * Class defining Chill mode exit criteria for Containers.
*/ */
public class ContainerChillModeRule implements public class ContainerChillModeRule extends
ChillModeExitRule<NodeRegistrationContainerReport>, ChillModeExitRule<NodeRegistrationContainerReport>{
EventHandler<NodeRegistrationContainerReport> {
// Required cutoff % for containers with at least 1 reported replica. // Required cutoff % for containers with at least 1 reported replica.
private double chillModeCutoff; private double chillModeCutoff;
@ -46,14 +48,20 @@ public class ContainerChillModeRule implements
private double maxContainer; private double maxContainer;
private AtomicLong containerWithMinReplicas = new AtomicLong(0); private AtomicLong containerWithMinReplicas = new AtomicLong(0);
private final SCMChillModeManager chillModeManager;
public ContainerChillModeRule(Configuration conf, public ContainerChillModeRule(String ruleName, EventQueue eventQueue,
Configuration conf,
List<ContainerInfo> containers, SCMChillModeManager manager) { List<ContainerInfo> containers, SCMChillModeManager manager) {
super(manager, ruleName, eventQueue);
chillModeCutoff = conf.getDouble( chillModeCutoff = conf.getDouble(
HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT, HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT,
HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT); HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT_DEFAULT);
chillModeManager = manager;
Preconditions.checkArgument(
(chillModeCutoff >= 0.0 && chillModeCutoff <= 1.0),
HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT +
" value should be >= 0.0 and <= 1.0");
containerMap = new ConcurrentHashMap<>(); containerMap = new ConcurrentHashMap<>();
if(containers != null) { if(containers != null) {
containers.forEach(c -> { containers.forEach(c -> {
@ -67,10 +75,18 @@ public class ContainerChillModeRule implements
}); });
maxContainer = containerMap.size(); maxContainer = containerMap.size();
} }
} }
@Override @Override
public boolean validate() { protected TypedEvent<NodeRegistrationContainerReport> getEventType() {
return SCMEvents.NODE_REGISTRATION_CONT_REPORT;
}
@Override
protected boolean validate() {
return getCurrentContainerThreshold() >= chillModeCutoff; return getCurrentContainerThreshold() >= chillModeCutoff;
} }
@ -83,7 +99,7 @@ public class ContainerChillModeRule implements
} }
@Override @Override
public void process(NodeRegistrationContainerReport reportsProto) { protected void process(NodeRegistrationContainerReport reportsProto) {
reportsProto.getReport().getReportsList().forEach(c -> { reportsProto.getReport().getReportsList().forEach(c -> {
if (containerMap.containsKey(c.getContainerID())) { if (containerMap.containsKey(c.getContainerID())) {
@ -92,37 +108,17 @@ public class ContainerChillModeRule implements
} }
} }
}); });
}
@Override
public void onMessage(NodeRegistrationContainerReport
nodeRegistrationContainerReport, EventPublisher publisher) {
// TODO: when we have remove handlers, we can remove getInChillmode check
if (chillModeManager.getInChillMode()) {
if (validate()) {
return;
}
process(nodeRegistrationContainerReport);
if (chillModeManager.getInChillMode()) {
SCMChillModeManager.getLogger().info(
"SCM in chill mode. {} % containers have at least one"
+ " reported replica.",
(containerWithMinReplicas.get() / maxContainer) * 100);
}
if (validate()) {
chillModeManager.validateChillModeExitRules(publisher);
}
if (scmInChillMode()) {
SCMChillModeManager.getLogger().info(
"SCM in chill mode. {} % containers have at least one"
+ " reported replica.",
(containerWithMinReplicas.doubleValue() / maxContainer) * 100);
} }
} }
@Override @Override
public void cleanup() { protected void cleanup() {
containerMap.clear(); containerMap.clear();
} }
} }

View File

@ -22,19 +22,18 @@ import java.util.UUID;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer.NodeRegistrationContainerReport; import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer.NodeRegistrationContainerReport;
import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.TypedEvent;
import org.apache.hadoop.hdds.server.events.EventPublisher;
/** /**
* Class defining Chill mode exit criteria according to number of DataNodes * Class defining Chill mode exit criteria according to number of DataNodes
* registered with SCM. * registered with SCM.
*/ */
public class DataNodeChillModeRule implements public class DataNodeChillModeRule extends
ChillModeExitRule<NodeRegistrationContainerReport>, ChillModeExitRule<NodeRegistrationContainerReport>{
EventHandler<NodeRegistrationContainerReport> {
// Min DataNodes required to exit chill mode. // Min DataNodes required to exit chill mode.
private int requiredDns; private int requiredDns;
@ -42,61 +41,42 @@ public class DataNodeChillModeRule implements
// Set to track registered DataNodes. // Set to track registered DataNodes.
private HashSet<UUID> registeredDnSet; private HashSet<UUID> registeredDnSet;
private final SCMChillModeManager chillModeManager; public DataNodeChillModeRule(String ruleName, EventQueue eventQueue,
Configuration conf,
public DataNodeChillModeRule(Configuration conf,
SCMChillModeManager manager) { SCMChillModeManager manager) {
super(manager, ruleName, eventQueue);
requiredDns = conf.getInt( requiredDns = conf.getInt(
HddsConfigKeys.HDDS_SCM_CHILLMODE_MIN_DATANODE, HddsConfigKeys.HDDS_SCM_CHILLMODE_MIN_DATANODE,
HddsConfigKeys.HDDS_SCM_CHILLMODE_MIN_DATANODE_DEFAULT); HddsConfigKeys.HDDS_SCM_CHILLMODE_MIN_DATANODE_DEFAULT);
registeredDnSet = new HashSet<>(requiredDns * 2); registeredDnSet = new HashSet<>(requiredDns * 2);
chillModeManager = manager;
} }
@Override @Override
public boolean validate() { protected TypedEvent<NodeRegistrationContainerReport> getEventType() {
return SCMEvents.NODE_REGISTRATION_CONT_REPORT;
}
@Override
protected boolean validate() {
return registeredDns >= requiredDns; return registeredDns >= requiredDns;
} }
@VisibleForTesting
public double getRegisteredDataNodes() {
return registeredDns;
}
@Override @Override
public void process(NodeRegistrationContainerReport reportsProto) { protected void process(NodeRegistrationContainerReport reportsProto) {
registeredDnSet.add(reportsProto.getDatanodeDetails().getUuid()); registeredDnSet.add(reportsProto.getDatanodeDetails().getUuid());
registeredDns = registeredDnSet.size(); registeredDns = registeredDnSet.size();
} if (scmInChillMode()) {
SCMChillModeManager.getLogger().info(
@Override "SCM in chill mode. {} DataNodes registered, {} required.",
public void onMessage(NodeRegistrationContainerReport registeredDns, requiredDns);
nodeRegistrationContainerReport, EventPublisher publisher) {
// TODO: when we have remove handlers, we can remove getInChillmode check
if (chillModeManager.getInChillMode()) {
if (validate()) {
return;
}
process(nodeRegistrationContainerReport);
if (chillModeManager.getInChillMode()) {
SCMChillModeManager.getLogger().info(
"SCM in chill mode. {} DataNodes registered, {} required.",
registeredDns, requiredDns);
}
if (validate()) {
chillModeManager.validateChillModeExitRules(publisher);
}
} }
} }
@Override @Override
public void cleanup() { protected void cleanup() {
registeredDnSet.clear(); registeredDnSet.clear();
} }
} }

View File

@ -17,21 +17,24 @@
*/ */
package org.apache.hadoop.hdds.scm.chillmode; package org.apache.hadoop.hdds.scm.chillmode;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.server.events.TypedEvent;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -46,28 +49,33 @@ import java.util.Set;
* through in a cluster. * through in a cluster.
*/ */
public class HealthyPipelineChillModeRule public class HealthyPipelineChillModeRule
implements ChillModeExitRule<PipelineReportFromDatanode>, extends ChillModeExitRule<PipelineReportFromDatanode>{
EventHandler<PipelineReportFromDatanode> {
public static final Logger LOG = public static final Logger LOG =
LoggerFactory.getLogger(HealthyPipelineChillModeRule.class); LoggerFactory.getLogger(HealthyPipelineChillModeRule.class);
private final PipelineManager pipelineManager; private final PipelineManager pipelineManager;
private final SCMChillModeManager chillModeManager;
private final int healthyPipelineThresholdCount; private final int healthyPipelineThresholdCount;
private int currentHealthyPipelineCount = 0; private int currentHealthyPipelineCount = 0;
private final Set<DatanodeDetails> processedDatanodeDetails = private final Set<DatanodeDetails> processedDatanodeDetails =
new HashSet<>(); new HashSet<>();
HealthyPipelineChillModeRule(PipelineManager pipelineManager, HealthyPipelineChillModeRule(String ruleName, EventQueue eventQueue,
PipelineManager pipelineManager,
SCMChillModeManager manager, Configuration configuration) { SCMChillModeManager manager, Configuration configuration) {
super(manager, ruleName, eventQueue);
this.pipelineManager = pipelineManager; this.pipelineManager = pipelineManager;
this.chillModeManager = manager;
double healthyPipelinesPercent = double healthyPipelinesPercent =
configuration.getDouble(HddsConfigKeys. configuration.getDouble(HddsConfigKeys.
HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT, HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT,
HddsConfigKeys. HddsConfigKeys.
HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT); HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT);
Preconditions.checkArgument(
(healthyPipelinesPercent >= 0.0 && healthyPipelinesPercent <= 1.0),
HddsConfigKeys.
HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT
+ " value should be >= 0.0 and <= 1.0");
// As we want to wait for 3 node pipelines // As we want to wait for 3 node pipelines
int pipelineCount = int pipelineCount =
pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS, pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
@ -84,7 +92,12 @@ public class HealthyPipelineChillModeRule
} }
@Override @Override
public boolean validate() { protected TypedEvent<PipelineReportFromDatanode> getEventType() {
return SCMEvents.PROCESSED_PIPELINE_REPORT;
}
@Override
protected boolean validate() {
if (currentHealthyPipelineCount >= healthyPipelineThresholdCount) { if (currentHealthyPipelineCount >= healthyPipelineThresholdCount) {
return true; return true;
} }
@ -92,59 +105,40 @@ public class HealthyPipelineChillModeRule
} }
@Override @Override
public void process(PipelineReportFromDatanode pipelineReportFromDatanode) { protected void process(PipelineReportFromDatanode
Pipeline pipeline; pipelineReportFromDatanode) {
Preconditions.checkNotNull(pipelineReportFromDatanode);
PipelineReportsProto pipelineReport =
pipelineReportFromDatanode.getReport();
for (PipelineReport report : pipelineReport.getPipelineReportList()) {
PipelineID pipelineID = PipelineID
.getFromProtobuf(report.getPipelineID());
try {
pipeline = pipelineManager.getPipeline(pipelineID);
} catch (PipelineNotFoundException e) {
continue;
}
if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
// If the pipeline is open state mean, all 3 datanodes are reported
// for this pipeline.
currentHealthyPipelineCount++;
}
}
}
@Override
public void cleanup() {
// No need to deal with
}
@Override
public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
EventPublisher publisher) {
// If we have already reached healthy pipeline threshold, skip processing
// pipeline report from datanode.
if (validate()) {
chillModeManager.validateChillModeExitRules(publisher);
return;
}
// When SCM is in chill mode for long time, already registered // When SCM is in chill mode for long time, already registered
// datanode can send pipeline report again, then pipeline handler fires // datanode can send pipeline report again, then pipeline handler fires
// processed report event, we should not consider this pipeline report // processed report event, we should not consider this pipeline report
// from datanode again during threshold calculation. // from datanode again during threshold calculation.
Preconditions.checkNotNull(pipelineReportFromDatanode);
DatanodeDetails dnDetails = pipelineReportFromDatanode.getDatanodeDetails(); DatanodeDetails dnDetails = pipelineReportFromDatanode.getDatanodeDetails();
if (!processedDatanodeDetails.contains( if (!processedDatanodeDetails.contains(
pipelineReportFromDatanode.getDatanodeDetails())) { pipelineReportFromDatanode.getDatanodeDetails())) {
// Process pipeline report from datanode Pipeline pipeline;
process(pipelineReportFromDatanode); PipelineReportsProto pipelineReport =
pipelineReportFromDatanode.getReport();
if (chillModeManager.getInChillMode()) { for (PipelineReport report : pipelineReport.getPipelineReportList()) {
PipelineID pipelineID = PipelineID
.getFromProtobuf(report.getPipelineID());
try {
pipeline = pipelineManager.getPipeline(pipelineID);
} catch (PipelineNotFoundException e) {
continue;
}
if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
// If the pipeline is open state mean, all 3 datanodes are reported
// for this pipeline.
currentHealthyPipelineCount++;
}
}
if (scmInChillMode()) {
SCMChillModeManager.getLogger().info( SCMChillModeManager.getLogger().info(
"SCM in chill mode. Healthy pipelines reported count is {}, " + "SCM in chill mode. Healthy pipelines reported count is {}, " +
"required healthy pipeline reported count is {}", "required healthy pipeline reported count is {}",
@ -154,8 +148,20 @@ public class HealthyPipelineChillModeRule
processedDatanodeDetails.add(dnDetails); processedDatanodeDetails.add(dnDetails);
} }
if (validate()) { }
chillModeManager.validateChillModeExitRules(publisher);
} @Override
protected void cleanup() {
processedDatanodeDetails.clear();
}
@VisibleForTesting
public int getCurrentHealthyPipelineCount() {
return currentHealthyPipelineCount;
}
@VisibleForTesting
public int getHealthyPipelineThresholdCount() {
return healthyPipelineThresholdCount;
} }
} }

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.hdds.scm.chillmode; package org.apache.hadoop.hdds.scm.chillmode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsConfigKeys;
@ -25,14 +26,15 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReport; .StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher. import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.
PipelineReportFromDatanode; PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.TypedEvent;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -40,13 +42,12 @@ import java.util.HashSet;
import java.util.Set; import java.util.Set;
/** /**
* This rule covers whether we have atleast one datanode is reported for each * This rule covers whether we have at least one datanode is reported for each
* pipeline. This rule is for all open containers, we have at least one * pipeline. This rule is for all open containers, we have at least one
* replica available for read when we exit chill mode. * replica available for read when we exit chill mode.
*/ */
public class OneReplicaPipelineChillModeRule implements public class OneReplicaPipelineChillModeRule extends
ChillModeExitRule<PipelineReportFromDatanode>, ChillModeExitRule<PipelineReportFromDatanode> {
EventHandler<PipelineReportFromDatanode> {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(OneReplicaPipelineChillModeRule.class); LoggerFactory.getLogger(OneReplicaPipelineChillModeRule.class);
@ -54,12 +55,13 @@ public class OneReplicaPipelineChillModeRule implements
private int thresholdCount; private int thresholdCount;
private Set<PipelineID> reportedPipelineIDSet = new HashSet<>(); private Set<PipelineID> reportedPipelineIDSet = new HashSet<>();
private final PipelineManager pipelineManager; private final PipelineManager pipelineManager;
private final SCMChillModeManager chillModeManager; private int currentReportedPipelineCount = 0;
public OneReplicaPipelineChillModeRule(PipelineManager pipelineManager,
SCMChillModeManager chillModeManager, public OneReplicaPipelineChillModeRule(String ruleName, EventQueue eventQueue,
Configuration configuration) { PipelineManager pipelineManager,
this.chillModeManager = chillModeManager; SCMChillModeManager chillModeManager, Configuration configuration) {
super(chillModeManager, ruleName, eventQueue);
this.pipelineManager = pipelineManager; this.pipelineManager = pipelineManager;
double percent = double percent =
@ -68,6 +70,11 @@ public class OneReplicaPipelineChillModeRule implements
HddsConfigKeys. HddsConfigKeys.
HDDS_SCM_CHILLMODE_ONE_NODE_REPORTED_PIPELINE_PCT_DEFAULT); HDDS_SCM_CHILLMODE_ONE_NODE_REPORTED_PIPELINE_PCT_DEFAULT);
Preconditions.checkArgument((percent >= 0.0 && percent <= 1.0),
HddsConfigKeys.
HDDS_SCM_CHILLMODE_ONE_NODE_REPORTED_PIPELINE_PCT +
" value should be >= 0.0 and <= 1.0");
int totalPipelineCount = int totalPipelineCount =
pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS, pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE).size(); HddsProtos.ReplicationFactor.THREE).size();
@ -79,16 +86,23 @@ public class OneReplicaPipelineChillModeRule implements
thresholdCount); thresholdCount);
} }
@Override @Override
public boolean validate() { protected TypedEvent<PipelineReportFromDatanode> getEventType() {
if (reportedPipelineIDSet.size() >= thresholdCount) { return SCMEvents.PROCESSED_PIPELINE_REPORT;
}
@Override
protected boolean validate() {
if (currentReportedPipelineCount >= thresholdCount) {
return true; return true;
} }
return false; return false;
} }
@Override @Override
public void process(PipelineReportFromDatanode pipelineReportFromDatanode) { protected void process(PipelineReportFromDatanode
pipelineReportFromDatanode) {
Pipeline pipeline; Pipeline pipeline;
Preconditions.checkNotNull(pipelineReportFromDatanode); Preconditions.checkNotNull(pipelineReportFromDatanode);
PipelineReportsProto pipelineReport = PipelineReportsProto pipelineReport =
@ -108,35 +122,32 @@ public class OneReplicaPipelineChillModeRule implements
reportedPipelineIDSet.add(pipelineID); reportedPipelineIDSet.add(pipelineID);
} }
} }
}
@Override currentReportedPipelineCount = reportedPipelineIDSet.size();
public void cleanup() {
reportedPipelineIDSet.clear();
}
@Override if (scmInChillMode()) {
public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
EventPublisher publisher) {
if (validate()) {
chillModeManager.validateChillModeExitRules(publisher);
return;
}
// Process pipeline report from datanode
process(pipelineReportFromDatanode);
if (chillModeManager.getInChillMode()) {
SCMChillModeManager.getLogger().info( SCMChillModeManager.getLogger().info(
"SCM in chill mode. Pipelines with atleast one datanode reported " + "SCM in chill mode. Pipelines with atleast one datanode reported " +
"count is {}, required atleast one datanode reported per " + "count is {}, required atleast one datanode reported per " +
"pipeline count is {}", "pipeline count is {}",
reportedPipelineIDSet.size(), thresholdCount); currentReportedPipelineCount, thresholdCount);
} }
if (validate()) {
chillModeManager.validateChillModeExitRules(publisher);
}
} }
@Override
protected void cleanup() {
reportedPipelineIDSet.clear();
}
@VisibleForTesting
public int getThresholdCount() {
return thresholdCount;
}
@VisibleForTesting
public int getCurrentReportedPipelineCount() {
return currentReportedPipelineCount;
}
} }

View File

@ -18,9 +18,12 @@
package org.apache.hadoop.hdds.scm.chillmode; package org.apache.hadoop.hdds.scm.chillmode;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsConfigKeys;
@ -39,10 +42,38 @@ import org.slf4j.LoggerFactory;
* *
* ChillModeExitRule defines format to define new rules which must be satisfied * ChillModeExitRule defines format to define new rules which must be satisfied
* to exit Chill mode. * to exit Chill mode.
* ContainerChillModeRule defines the only exit criteria right now. *
* On every new datanode registration event this class adds replicas * Current ChillMode rules:
* for reported containers and validates if cutoff threshold for * 1. ContainerChillModeRule:
* containers is meet. * On every new datanode registration, it fires
* {@link SCMEvents#NODE_REGISTRATION_CONT_REPORT}. This rule handles this
* event. This rule process this report, increment the
* containerWithMinReplicas count when this reported replica is in the
* containerMap. Then validates if cutoff threshold for containers is meet.
*
* 2. DatanodeChillModeRule:
* On every new datanode registration, it fires
* {@link SCMEvents#NODE_REGISTRATION_CONT_REPORT}. This rule handles this
* event. This rule process this report, and check if this is new node, add
* to its reported node list. Then validate it cutoff threshold for minimum
* number of datanode registered is met or not.
*
* 3. HealthyPipelineChillModeRule:
* Once the pipelineReportHandler processes the
* {@link SCMEvents#PIPELINE_REPORT}, it fires
* {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
* event. This rule processes this report, and check if pipeline is healthy
* and increments current healthy pipeline count. Then validate it cutoff
* threshold for healthy pipeline is met or not.
*
* 4. OneReplicaPipelineChillModeRule:
* Once the pipelineReportHandler processes the
* {@link SCMEvents#PIPELINE_REPORT}, it fires
* {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
* event. This rule processes this report, and add the reported pipeline to
* reported pipeline set. Then validate it cutoff threshold for one replica
* per pipeline is met or not.
*
*/ */
public class SCMChillModeManager { public class SCMChillModeManager {
@ -60,6 +91,8 @@ public class SCMChillModeManager {
private static final String ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE = private static final String ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE =
"AtleastOneDatanodeReportedRule"; "AtleastOneDatanodeReportedRule";
private Set<String> validatedRules = new HashSet<>();
private final EventQueue eventPublisher; private final EventQueue eventPublisher;
private final PipelineManager pipelineManager; private final PipelineManager pipelineManager;
@ -75,30 +108,27 @@ public class SCMChillModeManager {
if (isChillModeEnabled) { if (isChillModeEnabled) {
ContainerChillModeRule containerChillModeRule = ContainerChillModeRule containerChillModeRule =
new ContainerChillModeRule(config, allContainers, this); new ContainerChillModeRule(CONT_EXIT_RULE, eventQueue, config,
allContainers, this);
DataNodeChillModeRule dataNodeChillModeRule = DataNodeChillModeRule dataNodeChillModeRule =
new DataNodeChillModeRule(config, this); new DataNodeChillModeRule(DN_EXIT_RULE, eventQueue, config, this);
exitRules.put(CONT_EXIT_RULE, containerChillModeRule); exitRules.put(CONT_EXIT_RULE, containerChillModeRule);
exitRules.put(DN_EXIT_RULE, dataNodeChillModeRule); exitRules.put(DN_EXIT_RULE, dataNodeChillModeRule);
eventPublisher.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
containerChillModeRule);
eventPublisher.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
dataNodeChillModeRule);
if (conf.getBoolean( if (conf.getBoolean(
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK, HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK,
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT) HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT)
&& pipelineManager != null) { && pipelineManager != null) {
HealthyPipelineChillModeRule rule = new HealthyPipelineChillModeRule( HealthyPipelineChillModeRule healthyPipelineChillModeRule =
pipelineManager, this, config); new HealthyPipelineChillModeRule(HEALTHY_PIPELINE_EXIT_RULE,
eventQueue, pipelineManager,
this, config);
OneReplicaPipelineChillModeRule oneReplicaPipelineChillModeRule = OneReplicaPipelineChillModeRule oneReplicaPipelineChillModeRule =
new OneReplicaPipelineChillModeRule(pipelineManager, this, conf); new OneReplicaPipelineChillModeRule(
exitRules.put(HEALTHY_PIPELINE_EXIT_RULE, rule); ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE, eventQueue,
pipelineManager, this, conf);
exitRules.put(HEALTHY_PIPELINE_EXIT_RULE, healthyPipelineChillModeRule);
exitRules.put(ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE, exitRules.put(ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE,
oneReplicaPipelineChillModeRule); oneReplicaPipelineChillModeRule);
eventPublisher.addHandler(SCMEvents.PROCESSED_PIPELINE_REPORT, rule);
eventPublisher.addHandler(SCMEvents.PROCESSED_PIPELINE_REPORT,
oneReplicaPipelineChillModeRule);
} }
emitChillModeStatus(); emitChillModeStatus();
} else { } else {
@ -115,13 +145,24 @@ public class SCMChillModeManager {
new ChillModeStatus(getInChillMode())); new ChillModeStatus(getInChillMode()));
} }
public void validateChillModeExitRules(EventPublisher eventQueue) {
for (ChillModeExitRule exitRule : exitRules.values()) { public synchronized void validateChillModeExitRules(String ruleName,
if (!exitRule.validate()) { EventPublisher eventQueue) {
return;
} if (exitRules.get(ruleName) != null) {
validatedRules.add(ruleName);
} else {
// This should never happen
LOG.error("No Such Exit rule {}", ruleName);
} }
exitChillMode(eventQueue);
if (validatedRules.size() == exitRules.size()) {
// All rules are satisfied, we can exit chill mode.
LOG.info("ScmChillModeManager, all rules are successfully validated");
exitChillMode(eventQueue);
}
} }
/** /**
@ -140,9 +181,6 @@ public class SCMChillModeManager {
// TODO: Remove handler registration as there is no need to listen to // TODO: Remove handler registration as there is no need to listen to
// register events anymore. // register events anymore.
for (ChillModeExitRule e : exitRules.values()) {
e.cleanup();
}
emitChillModeStatus(); emitChillModeStatus();
// TODO: #CLUTIL if we reenter chill mode the fixed interval pipeline // TODO: #CLUTIL if we reenter chill mode the fixed interval pipeline
// creation job needs to stop // creation job needs to stop

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.chillmode;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File; import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
@ -37,12 +38,17 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.pipeline.*; import org.apache.hadoop.hdds.scm.pipeline.*;
import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout; import org.junit.rules.Timeout;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -56,7 +62,10 @@ public class TestSCMChillModeManager {
private List<ContainerInfo> containers; private List<ContainerInfo> containers;
@Rule @Rule
public Timeout timeout = new Timeout(1000 * 35); public Timeout timeout = new Timeout(1000 * 300);
@Rule
public final TemporaryFolder tempDir = new TemporaryFolder();
@BeforeClass @BeforeClass
public static void setUp() { public static void setUp() {
@ -124,6 +133,185 @@ public class TestSCMChillModeManager {
}, 100, 1000 * 5); }, 100, 1000 * 5);
} }
private OzoneConfiguration createConf(double healthyPercent,
double oneReplicaPercent) throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
tempDir.newFolder().toString());
conf.setBoolean(
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK,
true);
conf.setDouble(HddsConfigKeys.
HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT, healthyPercent);
conf.setDouble(HddsConfigKeys.
HDDS_SCM_CHILLMODE_ONE_NODE_REPORTED_PIPELINE_PCT, oneReplicaPercent);
return conf;
}
@Test
public void testChillModeExitRuleWithPipelineAvailabilityCheck()
throws Exception{
testChillModeExitRuleWithPipelineAvailabilityCheck(100, 30, 8, 0.90, 1);
testChillModeExitRuleWithPipelineAvailabilityCheck(100, 90, 22, 0.10, 0.9);
testChillModeExitRuleWithPipelineAvailabilityCheck(100, 30, 8, 0, 0.9);
testChillModeExitRuleWithPipelineAvailabilityCheck(100, 90, 22, 0, 0);
testChillModeExitRuleWithPipelineAvailabilityCheck(100, 90, 22, 0, 0.5);
}
@Test
public void testFailWithIncorrectValueForHealthyPipelinePercent()
throws Exception {
try {
OzoneConfiguration conf = createConf(100,
0.9);
MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
PipelineManager pipelineManager = new SCMPipelineManager(conf,
mockNodeManager, queue);
scmChillModeManager = new SCMChillModeManager(
conf, containers, pipelineManager, queue);
fail("testFailWithIncorrectValueForHealthyPipelinePercent");
} catch (IllegalArgumentException ex) {
GenericTestUtils.assertExceptionContains("value should be >= 0.0 and <=" +
" 1.0", ex);
}
}
@Test
public void testFailWithIncorrectValueForOneReplicaPipelinePercent()
throws Exception {
try {
OzoneConfiguration conf = createConf(0.9,
200);
MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
PipelineManager pipelineManager = new SCMPipelineManager(conf,
mockNodeManager, queue);
scmChillModeManager = new SCMChillModeManager(
conf, containers, pipelineManager, queue);
fail("testFailWithIncorrectValueForOneReplicaPipelinePercent");
} catch (IllegalArgumentException ex) {
GenericTestUtils.assertExceptionContains("value should be >= 0.0 and <=" +
" 1.0", ex);
}
}
@Test
public void testFailWithIncorrectValueForChillModePercent() throws Exception {
try {
OzoneConfiguration conf = createConf(0.9, 0.1);
conf.setDouble(HddsConfigKeys.HDDS_SCM_CHILLMODE_THRESHOLD_PCT, -1.0);
MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
PipelineManager pipelineManager = new SCMPipelineManager(conf,
mockNodeManager, queue);
scmChillModeManager = new SCMChillModeManager(
conf, containers, pipelineManager, queue);
fail("testFailWithIncorrectValueForChillModePercent");
} catch (IllegalArgumentException ex) {
GenericTestUtils.assertExceptionContains("value should be >= 0.0 and <=" +
" 1.0", ex);
}
}
public void testChillModeExitRuleWithPipelineAvailabilityCheck(
int containerCount, int nodeCount, int pipelineCount,
double healthyPipelinePercent, double oneReplicaPercent)
throws Exception {
OzoneConfiguration conf = createConf(healthyPipelinePercent,
oneReplicaPercent);
containers = new ArrayList<>();
containers.addAll(HddsTestUtils.getContainerInfo(containerCount));
MockNodeManager mockNodeManager = new MockNodeManager(true, nodeCount);
SCMPipelineManager pipelineManager = new SCMPipelineManager(conf,
mockNodeManager, queue);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(mockNodeManager,
pipelineManager.getStateManager(), config);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
for (int i=0; i < pipelineCount; i++) {
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
}
for (ContainerInfo container : containers) {
container.setState(HddsProtos.LifeCycleState.CLOSED);
}
scmChillModeManager = new SCMChillModeManager(conf, containers,
pipelineManager, queue);
assertTrue(scmChillModeManager.getInChillMode());
testContainerThreshold(containers, 1.0);
List<Pipeline> pipelines = pipelineManager.getPipelines();
int healthyPipelineThresholdCount =
scmChillModeManager.getHealthyPipelineChillModeRule()
.getHealthyPipelineThresholdCount();
int oneReplicaThresholdCount =
scmChillModeManager.getOneReplicaPipelineChillModeRule()
.getThresholdCount();
// Because even if no pipelines are there, and threshold we set to zero,
// we shall a get an event when datanode is registered. In that case,
// validate will return true, and add this to validatedRules.
if (Math.max(healthyPipelinePercent, oneReplicaThresholdCount) == 0) {
firePipelineEvent(pipelines.get(0));
}
for (int i = 0; i < Math.max(healthyPipelineThresholdCount,
oneReplicaThresholdCount); i++) {
firePipelineEvent(pipelines.get(i));
if (i < healthyPipelineThresholdCount) {
checkHealthy(i + 1);
}
if (i < oneReplicaThresholdCount) {
checkOpen(i + 1);
}
}
GenericTestUtils.waitFor(() -> {
return !scmChillModeManager.getInChillMode();
}, 100, 1000 * 5);
}
private void checkHealthy(int expectedCount) throws Exception{
GenericTestUtils.waitFor(() -> scmChillModeManager
.getHealthyPipelineChillModeRule()
.getCurrentHealthyPipelineCount() == expectedCount,
100, 5000);
}
private void checkOpen(int expectedCount) throws Exception {
GenericTestUtils.waitFor(() -> scmChillModeManager
.getOneReplicaPipelineChillModeRule()
.getCurrentReportedPipelineCount() == expectedCount,
1000, 5000);
}
private void firePipelineEvent(Pipeline pipeline) throws Exception {
PipelineReportsProto.Builder reportBuilder =
PipelineReportsProto.newBuilder();
reportBuilder.addPipelineReport(PipelineReport.newBuilder()
.setPipelineID(pipeline.getId().getProtobuf()));
queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
new PipelineReportFromDatanode(pipeline.getNodes().get(0),
reportBuilder.build()));
}
@Test @Test
public void testDisableChillMode() { public void testDisableChillMode() {
OzoneConfiguration conf = new OzoneConfiguration(config); OzoneConfiguration conf = new OzoneConfiguration(config);