HDDS-1448 : RatisPipelineProvider should only consider open pipeline while excluding dn for pipeline allocation. (#786)
This commit is contained in:
parent
1d59cc490c
commit
f194540520
|
@ -91,7 +91,9 @@ public class RatisPipelineProvider implements PipelineProvider {
|
||||||
public Pipeline create(ReplicationFactor factor) throws IOException {
|
public Pipeline create(ReplicationFactor factor) throws IOException {
|
||||||
// Get set of datanodes already used for ratis pipeline
|
// Get set of datanodes already used for ratis pipeline
|
||||||
Set<DatanodeDetails> dnsUsed = new HashSet<>();
|
Set<DatanodeDetails> dnsUsed = new HashSet<>();
|
||||||
stateManager.getPipelines(ReplicationType.RATIS, factor)
|
stateManager.getPipelines(ReplicationType.RATIS, factor).stream().filter(
|
||||||
|
p -> p.getPipelineState().equals(PipelineState.OPEN) ||
|
||||||
|
p.getPipelineState().equals(PipelineState.ALLOCATED))
|
||||||
.forEach(p -> dnsUsed.addAll(p.getNodes()));
|
.forEach(p -> dnsUsed.addAll(p.getNodes()));
|
||||||
|
|
||||||
// Get list of healthy nodes
|
// Get list of healthy nodes
|
||||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||||
import org.assertj.core.util.Preconditions;
|
import org.assertj.core.util.Preconditions;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
@ -184,7 +185,7 @@ public class MockNodeManager implements NodeManager {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public List<DatanodeDetails> getAllNodes() {
|
public List<DatanodeDetails> getAllNodes() {
|
||||||
return null;
|
return new ArrayList<>(nodeMetricMap.keySet());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -135,4 +135,67 @@ public class TestRatisPipelineProvider {
|
||||||
Pipeline.PipelineState.OPEN);
|
Pipeline.PipelineState.OPEN);
|
||||||
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
|
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreatePipelinesDnExclude() throws IOException {
|
||||||
|
|
||||||
|
// We have 10 DNs in MockNodeManager.
|
||||||
|
// Use up first 3 DNs for an open pipeline.
|
||||||
|
List<DatanodeDetails> openPiplineDns = nodeManager.getAllNodes()
|
||||||
|
.subList(0, 3);
|
||||||
|
HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
|
||||||
|
|
||||||
|
Pipeline openPipeline = Pipeline.newBuilder()
|
||||||
|
.setType(HddsProtos.ReplicationType.RATIS)
|
||||||
|
.setFactor(factor)
|
||||||
|
.setNodes(openPiplineDns)
|
||||||
|
.setState(Pipeline.PipelineState.OPEN)
|
||||||
|
.setId(PipelineID.randomId())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
stateManager.addPipeline(openPipeline);
|
||||||
|
|
||||||
|
// Use up next 3 DNs also for an open pipeline.
|
||||||
|
List<DatanodeDetails> moreOpenPiplineDns = nodeManager.getAllNodes()
|
||||||
|
.subList(3, 6);
|
||||||
|
Pipeline anotherOpenPipeline = Pipeline.newBuilder()
|
||||||
|
.setType(HddsProtos.ReplicationType.RATIS)
|
||||||
|
.setFactor(factor)
|
||||||
|
.setNodes(moreOpenPiplineDns)
|
||||||
|
.setState(Pipeline.PipelineState.OPEN)
|
||||||
|
.setId(PipelineID.randomId())
|
||||||
|
.build();
|
||||||
|
stateManager.addPipeline(anotherOpenPipeline);
|
||||||
|
|
||||||
|
// Use up next 3 DNs also for a closed pipeline.
|
||||||
|
List<DatanodeDetails> closedPiplineDns = nodeManager.getAllNodes()
|
||||||
|
.subList(6, 9);
|
||||||
|
Pipeline anotherClosedPipeline = Pipeline.newBuilder()
|
||||||
|
.setType(HddsProtos.ReplicationType.RATIS)
|
||||||
|
.setFactor(factor)
|
||||||
|
.setNodes(closedPiplineDns)
|
||||||
|
.setState(Pipeline.PipelineState.CLOSED)
|
||||||
|
.setId(PipelineID.randomId())
|
||||||
|
.build();
|
||||||
|
stateManager.addPipeline(anotherClosedPipeline);
|
||||||
|
|
||||||
|
Pipeline pipeline = provider.create(factor);
|
||||||
|
Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
|
||||||
|
Assert.assertEquals(pipeline.getFactor(), factor);
|
||||||
|
Assert.assertEquals(pipeline.getPipelineState(),
|
||||||
|
Pipeline.PipelineState.OPEN);
|
||||||
|
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
|
||||||
|
List<DatanodeDetails> pipelineNodes = pipeline.getNodes();
|
||||||
|
|
||||||
|
// Pipline nodes cannot be from open pipelines.
|
||||||
|
Assert.assertTrue(
|
||||||
|
pipelineNodes.parallelStream().filter(dn ->
|
||||||
|
(openPiplineDns.contains(dn) || moreOpenPiplineDns.contains(dn)))
|
||||||
|
.count() == 0);
|
||||||
|
|
||||||
|
// Since we have only 10 DNs, at least 1 pipeline node should have been
|
||||||
|
// from the closed pipeline DN list.
|
||||||
|
Assert.assertTrue(pipelineNodes.parallelStream().filter(
|
||||||
|
closedPiplineDns::contains).count() > 0);
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue