HDDS-1182. Pipeline Rule where at least one datanode is reported in the pipeline.

This commit is contained in:
Bharat Viswanadham 2019-02-28 21:41:36 -08:00 committed by GitHub
parent eae3db9649
commit 77b23c816e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 394 additions and 0 deletions

View File

@ -89,6 +89,11 @@ public final class HddsConfigKeys {
public static final double
HDDS_SCM_CHILLMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT = 0.10;
public static final String HDDS_SCM_CHILLMODE_ONE_NODE_REPORTED_PIPELINE_PCT =
"hdds.scm.chillmode.atleast.one.node.reported.pipeline.pct";
public static final double
HDDS_SCM_CHILLMODE_ONE_NODE_REPORTED_PIPELINE_PCT_DEFAULT = 0.90;
public static final String HDDS_LOCK_MAX_CONCURRENCY =
"hdds.lock.max.concurrency";
public static final int HDDS_LOCK_MAX_CONCURRENCY_DEFAULT = 100;

View File

@ -1325,6 +1325,16 @@
</description>
</property>
<property>
<name>hdds.scm.chillmode.atleast.one.node.reported.pipeline.pct</name>
<value>0.90</value>
<tag>HDDS,SCM,OPERATION</tag>
<description>
Percentage of pipelines, where at least one datanode is reported in the
pipeline.
</description>
</property>
<property>
<name>hdds.container.action.max.limit</name>
<value>20</value>

View File

@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.chillmode;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.
PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Set;
/**
* This rule covers whether we have atleast one datanode is reported for each
* pipeline. This rule is for all open containers, we have at least one
* replica available for read when we exit chill mode.
*/
public class OneReplicaPipelineChillModeRule implements
ChillModeExitRule<PipelineReportFromDatanode>,
EventHandler<PipelineReportFromDatanode> {
private static final Logger LOG =
LoggerFactory.getLogger(OneReplicaPipelineChillModeRule.class);
private int thresholdCount;
private Set<PipelineID> reportedPipelineIDSet = new HashSet<>();
private final PipelineManager pipelineManager;
private final SCMChillModeManager chillModeManager;
public OneReplicaPipelineChillModeRule(PipelineManager pipelineManager,
SCMChillModeManager chillModeManager,
Configuration configuration) {
this.chillModeManager = chillModeManager;
this.pipelineManager = pipelineManager;
double percent =
configuration.getDouble(
HddsConfigKeys.HDDS_SCM_CHILLMODE_ONE_NODE_REPORTED_PIPELINE_PCT,
HddsConfigKeys.
HDDS_SCM_CHILLMODE_ONE_NODE_REPORTED_PIPELINE_PCT_DEFAULT);
int totalPipelineCount =
pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE).size();
thresholdCount = (int) Math.ceil(percent * totalPipelineCount);
LOG.info(" Total pipeline count is {}, pipeline's with atleast one " +
"datanode reported threshold count is {}", totalPipelineCount,
thresholdCount);
}
@Override
public boolean validate() {
if (reportedPipelineIDSet.size() >= thresholdCount) {
return true;
}
return false;
}
@Override
public void process(PipelineReportFromDatanode pipelineReportFromDatanode) {
Pipeline pipeline;
Preconditions.checkNotNull(pipelineReportFromDatanode);
PipelineReportsProto pipelineReport =
pipelineReportFromDatanode.getReport();
for (PipelineReport report : pipelineReport.getPipelineReportList()) {
PipelineID pipelineID = PipelineID
.getFromProtobuf(report.getPipelineID());
try {
pipeline = pipelineManager.getPipeline(pipelineID);
} catch (PipelineNotFoundException e) {
continue;
}
if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
!reportedPipelineIDSet.contains(pipelineID)) {
reportedPipelineIDSet.add(pipelineID);
}
}
}
@Override
public void cleanup() {
reportedPipelineIDSet.clear();
}
@Override
public void onMessage(PipelineReportFromDatanode pipelineReportFromDatanode,
EventPublisher publisher) {
if (validate()) {
chillModeManager.validateChillModeExitRules(publisher);
return;
}
// Process pipeline report from datanode
process(pipelineReportFromDatanode);
if (chillModeManager.getInChillMode()) {
SCMChillModeManager.getLogger().info(
"SCM in chill mode. Pipelines with atleast one datanode reported " +
"count is {}, required atleast one datanode reported per " +
"pipeline count is {}",
reportedPipelineIDSet.size(), thresholdCount);
}
if (validate()) {
chillModeManager.validateChillModeExitRules(publisher);
}
}
}

View File

@ -62,6 +62,8 @@ public class SCMChillModeManager implements
private static final String DN_EXIT_RULE = "DataNodeChillModeRule";
private static final String HEALTHY_PIPELINE_EXIT_RULE =
"HealthyPipelineChillModeRule";
private static final String ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE =
"AtleastOneDatanodeReportedRule";
private final EventQueue eventPublisher;
private final PipelineManager pipelineManager;
@ -86,8 +88,14 @@ public class SCMChillModeManager implements
&& pipelineManager != null) {
HealthyPipelineChillModeRule rule = new HealthyPipelineChillModeRule(
pipelineManager, this, config);
OneReplicaPipelineChillModeRule oneReplicaPipelineChillModeRule =
new OneReplicaPipelineChillModeRule(pipelineManager, this, conf);
exitRules.put(HEALTHY_PIPELINE_EXIT_RULE, rule);
exitRules.put(ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE,
oneReplicaPipelineChillModeRule);
eventPublisher.addHandler(SCMEvents.PROCESSED_PIPELINE_REPORT, rule);
eventPublisher.addHandler(SCMEvents.PROCESSED_PIPELINE_REPORT,
oneReplicaPipelineChillModeRule);
}
emitChillModeStatus();
} else {
@ -179,4 +187,10 @@ public class SCMChillModeManager implements
exitRules.get(HEALTHY_PIPELINE_EXIT_RULE);
}
@VisibleForTesting
public OneReplicaPipelineChillModeRule getOneReplicaPipelineChillModeRule() {
return (OneReplicaPipelineChillModeRule)
exitRules.get(ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE);
}
}

View File

@ -0,0 +1,202 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.chillmode;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* This class tests OneReplicaPipelineChillModeRule.
*/
public class TestOneReplicaPipelineChillModeRule {
@Rule
public TemporaryFolder folder = new TemporaryFolder();
private OneReplicaPipelineChillModeRule rule;
private PipelineManager pipelineManager;
private EventQueue eventQueue;
private void setup(int nodes, int pipelineFactorThreeCount,
int pipelineFactorOneCount) throws Exception {
OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
ozoneConfiguration.setBoolean(
HddsConfigKeys.HDDS_SCM_CHILLMODE_PIPELINE_AVAILABILITY_CHECK, true);
ozoneConfiguration.set(HddsConfigKeys.OZONE_METADATA_DIRS,
folder.newFolder().toString());
List<ContainerInfo> containers = new ArrayList<>();
containers.addAll(HddsTestUtils.getContainerInfo(1));
MockNodeManager mockNodeManager = new MockNodeManager(true, nodes);
eventQueue = new EventQueue();
pipelineManager =
new SCMPipelineManager(ozoneConfiguration, mockNodeManager,
eventQueue);
createPipelines(pipelineFactorThreeCount,
HddsProtos.ReplicationFactor.THREE);
createPipelines(pipelineFactorOneCount,
HddsProtos.ReplicationFactor.ONE);
SCMChillModeManager scmChillModeManager =
new SCMChillModeManager(ozoneConfiguration, containers,
pipelineManager, eventQueue);
rule = scmChillModeManager.getOneReplicaPipelineChillModeRule();
}
@Test
public void testOneReplicaPipelineRule() throws Exception {
// As with 30 nodes, We can create 7 pipelines with replication factor 3.
// (This is because in node manager for every 10 nodes, 7 nodes are
// healthy, 2 are stale one is dead.)
int nodes = 30;
int pipelineFactorThreeCount = 7;
int pipelineCountOne = 0;
setup(nodes, pipelineFactorThreeCount, pipelineCountOne);
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(
LoggerFactory.getLogger(SCMChillModeManager.class));
List<Pipeline> pipelines = pipelineManager.getPipelines();
for (int i = 0; i < pipelineFactorThreeCount -1; i++) {
firePipelineEvent(pipelines.get(i));
}
// As 90% of 7 with ceil is 7, if we send 6 pipeline reports, rule
// validate should be still false.
GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains(
"reported count is 6"), 1000, 5000);
Assert.assertFalse(rule.validate());
//Fire last pipeline event from datanode.
firePipelineEvent(pipelines.get(pipelineFactorThreeCount - 1));
GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000);
}
@Test
public void testOneReplicaPipelineRuleMixedPipelines() throws Exception {
// As with 30 nodes, We can create 7 pipelines with replication factor 3.
// (This is because in node manager for every 10 nodes, 7 nodes are
// healthy, 2 are stale one is dead.)
int nodes = 30;
int pipelineCountThree = 7;
int pipelineCountOne = 21;
setup(nodes, pipelineCountThree, pipelineCountOne);
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(
LoggerFactory.getLogger(SCMChillModeManager.class));
List<Pipeline> pipelines =
pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.ONE);
for (int i = 0; i < pipelineCountOne; i++) {
firePipelineEvent(pipelines.get(i));
}
GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains(
"reported count is 0"), 1000, 5000);
// fired events for one node ratis pipeline, so we will be still false.
Assert.assertFalse(rule.validate());
pipelines =
pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
for (int i = 0; i < pipelineCountThree - 1; i++) {
firePipelineEvent(pipelines.get(i));
}
GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains(
"reported count is 6"), 1000, 5000);
//Fire last pipeline event from datanode.
firePipelineEvent(pipelines.get(pipelineCountThree - 1));
GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000);
}
private void createPipelines(int count,
HddsProtos.ReplicationFactor factor) throws Exception {
for (int i = 0; i < count; i++) {
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
factor);
}
}
private void firePipelineEvent(Pipeline pipeline) {
PipelineReportsProto.Builder reportBuilder =
PipelineReportsProto.newBuilder();
reportBuilder.addPipelineReport(PipelineReport.newBuilder()
.setPipelineID(pipeline.getId().getProtobuf()));
if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) {
eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
pipeline.getNodes().get(0), reportBuilder.build()));
eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
pipeline.getNodes().get(1), reportBuilder.build()));
eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
pipeline.getNodes().get(2), reportBuilder.build()));
} else {
eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
pipeline.getNodes().get(0), reportBuilder.build()));
}
}
}

View File

@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.chillmode;
/**
* SCM Chill mode tests.
*/