From f2fb6536dcbe6320f69273bf9e11d4701248172c Mon Sep 17 00:00:00 2001 From: Yiqun Lin Date: Mon, 18 Feb 2019 22:35:23 +0800 Subject: [PATCH] HDDS-1106. Introduce queryMap in PipelineManager. Contributed by Lokesh Jain. --- .../hdds/scm/pipeline/PipelineStateMap.java | 72 ++++++++++++++++++- .../pipeline/TestPipelineStateManager.java | 42 +++++++++++ 2 files changed, 111 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java index dea2115030a..2b6c61b1e84 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.pipeline; import com.google.common.base.Preconditions; +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; /** @@ -42,15 +44,27 @@ class PipelineStateMap { private final Map pipelineMap; private final Map> pipeline2container; + private final Map> query2OpenPipelines; PipelineStateMap() { // TODO: Use TreeMap for range operations? - this.pipelineMap = new HashMap<>(); - this.pipeline2container = new HashMap<>(); + pipelineMap = new HashMap<>(); + pipeline2container = new HashMap<>(); + query2OpenPipelines = new HashMap<>(); + initializeQueryMap(); } + private void initializeQueryMap() { + for (ReplicationType type : ReplicationType.values()) { + for (ReplicationFactor factor : ReplicationFactor.values()) { + query2OpenPipelines + .put(new PipelineQuery(type, factor), new CopyOnWriteArrayList<>()); + } + } + } + /** * Adds provided pipeline in the data structures. * @@ -70,6 +84,9 @@ class PipelineStateMap { .format("Duplicate pipeline ID %s detected.", pipeline.getId())); } pipeline2container.put(pipeline.getId(), new TreeSet<>()); + if (pipeline.getPipelineState() == PipelineState.OPEN) { + query2OpenPipelines.get(new PipelineQuery(pipeline)).add(pipeline); + } } /** @@ -188,6 +205,10 @@ class PipelineStateMap { Preconditions.checkNotNull(factor, "Replication factor cannot be null"); Preconditions.checkNotNull(state, "Pipeline state cannot be null"); + if (state == PipelineState.OPEN) { + return Collections.unmodifiableList( + query2OpenPipelines.get(new PipelineQuery(type, factor))); + } return pipelineMap.values().stream().filter( pipeline -> pipeline.getType() == type && pipeline.getPipelineState() == state @@ -293,7 +314,52 @@ class PipelineStateMap { Preconditions.checkNotNull(state, "Pipeline LifeCycleState cannot be null"); final Pipeline pipeline = getPipeline(pipelineID); - return pipelineMap.compute(pipelineID, + Pipeline updatedPipeline = pipelineMap.compute(pipelineID, (id, p) -> Pipeline.newBuilder(pipeline).setState(state).build()); + PipelineQuery query = new PipelineQuery(pipeline); + if (updatedPipeline.getPipelineState() == PipelineState.OPEN) { + // for transition to OPEN state add pipeline to query2OpenPipelines + query2OpenPipelines.get(query).add(updatedPipeline); + } else if (updatedPipeline.getPipelineState() == PipelineState.CLOSED) { + // for transition from OPEN to CLOSED state remove pipeline from + // query2OpenPipelines + query2OpenPipelines.get(query).remove(pipeline); + } + return updatedPipeline; + } + + private class PipelineQuery { + private ReplicationType type; + private ReplicationFactor factor; + + PipelineQuery(ReplicationType type, ReplicationFactor factor) { + this.type = type; + this.factor = factor; + } + + PipelineQuery(Pipeline pipeline) { + type = pipeline.getType(); + factor = pipeline.getFactor(); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!this.getClass().equals(other.getClass())) { + return false; + } + PipelineQuery otherQuery = (PipelineQuery) other; + return type == otherQuery.type && factor == otherQuery.factor; + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(type) + .append(factor) + .toHashCode(); + } } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java index 823cd7de793..33dd7df9181 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java @@ -419,6 +419,48 @@ public class TestPipelineStateManager { removePipeline(pipeline); } + @Test + public void testQueryPipeline() throws IOException { + Pipeline pipeline = createDummyPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, 3); + // pipeline in allocated state should not be reported + stateManager.addPipeline(pipeline); + Assert.assertEquals(0, stateManager + .getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN) + .size()); + + // pipeline in open state should be reported + stateManager.openPipeline(pipeline.getId()); + Assert.assertEquals(1, stateManager + .getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN) + .size()); + + Pipeline pipeline2 = createDummyPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, 3); + pipeline2 = Pipeline.newBuilder(pipeline2) + .setState(Pipeline.PipelineState.OPEN) + .build(); + // pipeline in open state should be reported + stateManager.addPipeline(pipeline2); + Assert.assertEquals(2, stateManager + .getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN) + .size()); + + // pipeline in closed state should not be reported + stateManager.finalizePipeline(pipeline2.getId()); + Assert.assertEquals(1, stateManager + .getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN) + .size()); + + // clean up + removePipeline(pipeline); + removePipeline(pipeline2); + } + private void removePipeline(Pipeline pipeline) throws IOException { stateManager.finalizePipeline(pipeline.getId()); stateManager.removePipeline(pipeline.getId());