From 2a4632d3d7b82980d10cc90cdfc52afd866cebb8 Mon Sep 17 00:00:00 2001 From: Mukul Kumar Singh Date: Sun, 17 Jun 2018 23:48:33 -0700 Subject: [PATCH] HDDS-141. Remove PipeLine Class from SCM and move the data field in the Pipeline to ContainerInfo. Contributed by Shashikant Banerjee. --- .../common/helpers/ContainerInfo.java | 32 ++++ .../container/common/helpers/Pipeline.java | 148 +++++++++++------- .../common/helpers/PipelineChannel.java | 124 --------------- hadoop-hdds/common/src/main/proto/hdds.proto | 8 +- .../scm/container/closer/ContainerCloser.java | 6 +- .../hdds/scm/pipelines/PipelineManager.java | 67 ++++---- .../hdds/scm/pipelines/PipelineSelector.java | 11 +- .../scm/pipelines/ratis/RatisManagerImpl.java | 13 +- .../standalone/StandaloneManagerImpl.java | 8 +- .../hdds/scm/block/TestDeletedBlockLog.java | 8 +- .../hadoop/ozone/TestMiniOzoneCluster.java | 8 +- .../ozone/container/ContainerTestHelper.java | 19 +-- .../genesis/BenchMarkContainerStateMap.java | 11 +- .../genesis/BenchMarkDatanodeDispatcher.java | 6 +- .../apache/hadoop/ozone/scm/cli/SQLCLI.java | 4 +- 15 files changed, 194 insertions(+), 279 deletions(-) delete mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineChannel.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java index 2c38d457284..ee05c8768af 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ContainerInfo.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.container.common.helpers; import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; @@ -30,6 +31,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.util.Time; import java.io.IOException; +import java.util.Arrays; import java.util.Comparator; import static java.lang.Math.max; @@ -63,6 +65,13 @@ public class ContainerInfo private String owner; private long containerID; private long deleteTransactionId; + /** + * Allows you to maintain private data on ContainerInfo. This is not + * serialized via protobuf, just allows us to maintain some private data. + */ + @JsonIgnore + private byte[] data; + ContainerInfo( long containerID, HddsProtos.LifeCycleState state, @@ -295,6 +304,29 @@ public class ContainerInfo return WRITER.writeValueAsString(this); } + /** + * Returns private data that is set on this containerInfo. + * + * @return blob, the user can interpret it any way they like. + */ + public byte[] getData() { + if (this.data != null) { + return Arrays.copyOf(this.data, this.data.length); + } else { + return null; + } + } + + /** + * Set private data on ContainerInfo object. + * + * @param data -- private data. + */ + public void setData(byte[] data) { + if (data != null) { + this.data = Arrays.copyOf(data, data.length); + } + } /** * Builder class for ContainerInfo. */ diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java index 87408385ecc..c5794f4c036 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java @@ -27,14 +27,14 @@ import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.ser.FilterProvider; import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter; import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Map; +import java.util.TreeMap; import java.util.List; /** @@ -46,7 +46,7 @@ public class Pipeline { static { ObjectMapper mapper = new ObjectMapper(); - String[] ignorableFieldNames = {"data"}; + String[] ignorableFieldNames = {"leaderID", "datanodes"}; FilterProvider filters = new SimpleFilterProvider() .addFilter(PIPELINE_INFO, SimpleBeanPropertyFilter .serializeAllExcept(ignorableFieldNames)); @@ -57,38 +57,66 @@ public class Pipeline { WRITER = mapper.writer(filters); } - private PipelineChannel pipelineChannel; - /** - * Allows you to maintain private data on pipelines. This is not serialized - * via protobuf, just allows us to maintain some private data. - */ @JsonIgnore - private byte[] data; + private String leaderID; + @JsonIgnore + private Map datanodes; + private HddsProtos.LifeCycleState lifeCycleState; + private HddsProtos.ReplicationType type; + private HddsProtos.ReplicationFactor factor; + private String name; + // TODO: change to long based id + //private long id; + /** * Constructs a new pipeline data structure. * - * @param pipelineChannel - transport information for this container + * @param leaderID - Leader datanode id + * @param lifeCycleState - Pipeline State + * @param replicationType - Replication protocol + * @param replicationFactor - replication count on datanodes + * @param name - pipelineName */ - public Pipeline(PipelineChannel pipelineChannel) { - this.pipelineChannel = pipelineChannel; - data = null; + public Pipeline(String leaderID, HddsProtos.LifeCycleState lifeCycleState, + HddsProtos.ReplicationType replicationType, + HddsProtos.ReplicationFactor replicationFactor, String name) { + this.leaderID = leaderID; + this.lifeCycleState = lifeCycleState; + this.type = replicationType; + this.factor = replicationFactor; + this.name = name; + datanodes = new TreeMap<>(); } /** * Gets pipeline object from protobuf. * - * @param pipeline - ProtoBuf definition for the pipeline. + * @param pipelineProto - ProtoBuf definition for the pipeline. * @return Pipeline Object */ - public static Pipeline getFromProtoBuf(HddsProtos.Pipeline pipeline) { - Preconditions.checkNotNull(pipeline); - PipelineChannel pipelineChannel = - PipelineChannel.getFromProtoBuf(pipeline.getPipelineChannel()); - return new Pipeline(pipelineChannel); + public static Pipeline getFromProtoBuf( + HddsProtos.Pipeline pipelineProto) { + Preconditions.checkNotNull(pipelineProto); + Pipeline pipeline = + new Pipeline(pipelineProto.getLeaderID(), + pipelineProto.getState(), + pipelineProto.getType(), + pipelineProto.getFactor(), + pipelineProto.getName()); + + for (HddsProtos.DatanodeDetailsProto dataID : + pipelineProto.getMembersList()) { + pipeline.addMember(DatanodeDetails.getFromProtoBuf(dataID)); + } + return pipeline; } + /** + * returns the replication count. + * @return Replication Factor + */ public HddsProtos.ReplicationFactor getFactor() { - return pipelineChannel.getFactor(); + return factor; } /** @@ -98,19 +126,34 @@ public class Pipeline { */ @JsonIgnore public DatanodeDetails getLeader() { - return pipelineChannel.getDatanodes().get(pipelineChannel.getLeaderID()); + return getDatanodes().get(leaderID); } + public void addMember(DatanodeDetails datanodeDetails) { + datanodes.put(datanodeDetails.getUuid().toString(), + datanodeDetails); + } + + public Map getDatanodes() { + return datanodes; + } /** * Returns the leader host. * * @return First Machine. */ public String getLeaderHost() { - return pipelineChannel.getDatanodes() - .get(pipelineChannel.getLeaderID()).getHostName(); + return getDatanodes() + .get(leaderID).getHostName(); } + /** + * + * @return lead + */ + public String getLeaderID() { + return leaderID; + } /** * Returns all machines that make up this pipeline. * @@ -118,7 +161,7 @@ public class Pipeline { */ @JsonIgnore public List getMachines() { - return new ArrayList<>(pipelineChannel.getDatanodes().values()); + return new ArrayList<>(getDatanodes().values()); } /** @@ -128,7 +171,7 @@ public class Pipeline { */ public List getDatanodeHosts() { List dataHosts = new ArrayList<>(); - for (DatanodeDetails id : pipelineChannel.getDatanodes().values()) { + for (DatanodeDetails id :getDatanodes().values()) { dataHosts.add(id.getHostName()); } return dataHosts; @@ -143,46 +186,31 @@ public class Pipeline { public HddsProtos.Pipeline getProtobufMessage() { HddsProtos.Pipeline.Builder builder = HddsProtos.Pipeline.newBuilder(); - builder.setPipelineChannel(this.pipelineChannel.getProtobufMessage()); + for (DatanodeDetails datanode : datanodes.values()) { + builder.addMembers(datanode.getProtoBufMessage()); + } + builder.setLeaderID(leaderID); + + if (this.getLifeCycleState() != null) { + builder.setState(this.getLifeCycleState()); + } + if (this.getType() != null) { + builder.setType(this.getType()); + } + + if (this.getFactor() != null) { + builder.setFactor(this.getFactor()); + } return builder.build(); } - /** - * Returns private data that is set on this pipeline. - * - * @return blob, the user can interpret it any way they like. - */ - public byte[] getData() { - if (this.data != null) { - return Arrays.copyOf(this.data, this.data.length); - } else { - return null; - } - } - - @VisibleForTesting - public PipelineChannel getPipelineChannel() { - return pipelineChannel; - } - - /** - * Set private data on pipeline. - * - * @param data -- private data. - */ - public void setData(byte[] data) { - if (data != null) { - this.data = Arrays.copyOf(data, data.length); - } - } - /** * Gets the State of the pipeline. * * @return - LifeCycleStates. */ public HddsProtos.LifeCycleState getLifeCycleState() { - return pipelineChannel.getLifeCycleState(); + return lifeCycleState; } /** @@ -191,7 +219,7 @@ public class Pipeline { * @return - Name of the pipeline */ public String getPipelineName() { - return pipelineChannel.getName(); + return name; } /** @@ -200,16 +228,16 @@ public class Pipeline { * @return type - Standalone, Ratis, Chained. */ public HddsProtos.ReplicationType getType() { - return pipelineChannel.getType(); + return type; } @Override public String toString() { final StringBuilder b = new StringBuilder(getClass().getSimpleName()) .append("["); - pipelineChannel.getDatanodes().keySet().stream() + getDatanodes().keySet().stream() .forEach(id -> b. - append(id.endsWith(pipelineChannel.getLeaderID()) ? "*" + id : id)); + append(id.endsWith(getLeaderID()) ? "*" + id : id)); b.append(" name:").append(getPipelineName()); if (getType() != null) { b.append(" type:").append(getType().toString()); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineChannel.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineChannel.java deleted file mode 100644 index 655751d737a..00000000000 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/PipelineChannel.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.container.common.helpers; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; - -import java.util.Map; -import java.util.TreeMap; - -/** - * PipelineChannel information for a {@link Pipeline}. - */ -public class PipelineChannel { - @JsonIgnore - private String leaderID; - @JsonIgnore - private Map datanodes; - private LifeCycleState lifeCycleState; - private ReplicationType type; - private ReplicationFactor factor; - private String name; - // TODO: change to long based id - //private long id; - - public PipelineChannel(String leaderID, LifeCycleState lifeCycleState, - ReplicationType replicationType, ReplicationFactor replicationFactor, - String name) { - this.leaderID = leaderID; - this.lifeCycleState = lifeCycleState; - this.type = replicationType; - this.factor = replicationFactor; - this.name = name; - datanodes = new TreeMap<>(); - } - - public String getLeaderID() { - return leaderID; - } - - public Map getDatanodes() { - return datanodes; - } - - public LifeCycleState getLifeCycleState() { - return lifeCycleState; - } - - public ReplicationType getType() { - return type; - } - - public ReplicationFactor getFactor() { - return factor; - } - - public String getName() { - return name; - } - - public void addMember(DatanodeDetails datanodeDetails) { - datanodes.put(datanodeDetails.getUuid().toString(), - datanodeDetails); - } - - @JsonIgnore - public HddsProtos.PipelineChannel getProtobufMessage() { - HddsProtos.PipelineChannel.Builder builder = - HddsProtos.PipelineChannel.newBuilder(); - for (DatanodeDetails datanode : datanodes.values()) { - builder.addMembers(datanode.getProtoBufMessage()); - } - builder.setLeaderID(leaderID); - - if (this.getLifeCycleState() != null) { - builder.setState(this.getLifeCycleState()); - } - if (this.getType() != null) { - builder.setType(this.getType()); - } - - if (this.getFactor() != null) { - builder.setFactor(this.getFactor()); - } - return builder.build(); - } - - public static PipelineChannel getFromProtoBuf( - HddsProtos.PipelineChannel transportProtos) { - Preconditions.checkNotNull(transportProtos); - PipelineChannel pipelineChannel = - new PipelineChannel(transportProtos.getLeaderID(), - transportProtos.getState(), - transportProtos.getType(), - transportProtos.getFactor(), - transportProtos.getName()); - - for (HddsProtos.DatanodeDetailsProto dataID : - transportProtos.getMembersList()) { - pipelineChannel.addMember(DatanodeDetails.getFromProtoBuf(dataID)); - } - return pipelineChannel; - } -} diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index a9a703eb000..816efa7c253 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -40,7 +40,7 @@ message Port { required uint32 value = 2; } -message PipelineChannel { +message Pipeline { required string leaderID = 1; repeated DatanodeDetailsProto members = 2; optional LifeCycleState state = 3 [default = OPEN]; @@ -49,12 +49,6 @@ message PipelineChannel { optional string name = 6; } -// A pipeline is composed of PipelineChannel (Ratis/StandAlone) that back a -// container. -message Pipeline { - required PipelineChannel pipelineChannel = 2; -} - message KeyValue { required string key = 1; optional string value = 2; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java index 937076cfb77..cbb2ba75c2e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java @@ -127,12 +127,12 @@ public class ContainerCloser { // to SCM. In that case also, data node will ignore this command. HddsProtos.Pipeline pipeline = info.getPipeline(); - for (HddsProtos.DatanodeDetailsProto datanodeDetails : pipeline - .getPipelineChannel().getMembersList()) { + for (HddsProtos.DatanodeDetailsProto datanodeDetails : + pipeline.getMembersList()) { nodeManager.addDatanodeCommand( DatanodeDetails.getFromProtoBuf(datanodeDetails).getUuid(), new CloseContainerCommand(info.getContainerID(), - pipeline.getPipelineChannel().getType())); + pipeline.getType())); } if (!commandIssued.containsKey(info.getContainerID())) { commandIssued.put(info.getContainerID(), diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java index 832fcc669a3..48affa41129 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java @@ -17,7 +17,6 @@ package org.apache.hadoop.hdds.scm.pipelines; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; @@ -36,12 +35,12 @@ import java.util.concurrent.atomic.AtomicInteger; public abstract class PipelineManager { private static final Logger LOG = LoggerFactory.getLogger(PipelineManager.class); - private final List activePipelineChannels; - private final AtomicInteger conduitsIndex; + private final List activePipelines; + private final AtomicInteger pipelineIndex; public PipelineManager() { - activePipelineChannels = new LinkedList<>(); - conduitsIndex = new AtomicInteger(0); + activePipelines = new LinkedList<>(); + pipelineIndex = new AtomicInteger(0); } /** @@ -59,9 +58,9 @@ public abstract class PipelineManager { /** * In the Ozone world, we have a very simple policy. * - * 1. Try to create a pipelineChannel if there are enough free nodes. + * 1. Try to create a pipeline if there are enough free nodes. * - * 2. This allows all nodes to part of a pipelineChannel quickly. + * 2. This allows all nodes to part of a pipeline quickly. * * 3. if there are not enough free nodes, return conduits in a * round-robin fashion. @@ -70,28 +69,28 @@ public abstract class PipelineManager { * Create a new placement policy that returns conduits in round robin * fashion. */ - PipelineChannel pipelineChannel = - allocatePipelineChannel(replicationFactor); - if (pipelineChannel != null) { - LOG.debug("created new pipelineChannel:{} for container with " + + Pipeline pipeline = + allocatePipeline(replicationFactor); + if (pipeline != null) { + LOG.debug("created new pipeline:{} for container with " + "replicationType:{} replicationFactor:{}", - pipelineChannel.getName(), replicationType, replicationFactor); - activePipelineChannels.add(pipelineChannel); + pipeline.getPipelineName(), replicationType, replicationFactor); + activePipelines.add(pipeline); } else { - pipelineChannel = - findOpenPipelineChannel(replicationType, replicationFactor); - if (pipelineChannel != null) { - LOG.debug("re-used pipelineChannel:{} for container with " + + pipeline = + findOpenPipeline(replicationType, replicationFactor); + if (pipeline != null) { + LOG.debug("re-used pipeline:{} for container with " + "replicationType:{} replicationFactor:{}", - pipelineChannel.getName(), replicationType, replicationFactor); + pipeline.getPipelineName(), replicationType, replicationFactor); } } - if (pipelineChannel == null) { - LOG.error("Get pipelineChannel call failed. We are not able to find" + - "free nodes or operational pipelineChannel."); + if (pipeline == null) { + LOG.error("Get pipeline call failed. We are not able to find" + + "free nodes or operational pipeline."); return null; } else { - return new Pipeline(pipelineChannel); + return pipeline; } } @@ -106,19 +105,19 @@ public abstract class PipelineManager { } } - public abstract PipelineChannel allocatePipelineChannel( + public abstract Pipeline allocatePipeline( ReplicationFactor replicationFactor) throws IOException; /** - * Find a PipelineChannel that is operational. + * Find a Pipeline that is operational. * * @return - Pipeline or null */ - private PipelineChannel findOpenPipelineChannel( + private Pipeline findOpenPipeline( ReplicationType type, ReplicationFactor factor) { - PipelineChannel pipelineChannel = null; + Pipeline pipeline = null; final int sentinal = -1; - if (activePipelineChannels.size() == 0) { + if (activePipelines.size() == 0) { LOG.error("No Operational conduits found. Returning null."); return null; } @@ -126,26 +125,26 @@ public abstract class PipelineManager { int nextIndex = sentinal; for (; startIndex != nextIndex; nextIndex = getNextIndex()) { // Just walk the list in a circular way. - PipelineChannel temp = - activePipelineChannels + Pipeline temp = + activePipelines .get(nextIndex != sentinal ? nextIndex : startIndex); - // if we find an operational pipelineChannel just return that. + // if we find an operational pipeline just return that. if ((temp.getLifeCycleState() == LifeCycleState.OPEN) && (temp.getFactor() == factor) && (temp.getType() == type)) { - pipelineChannel = temp; + pipeline = temp; break; } } - return pipelineChannel; + return pipeline; } /** - * gets the next index of the PipelineChannel to get. + * gets the next index of the Pipeline to get. * * @return index in the link list to get. */ private int getNextIndex() { - return conduitsIndex.incrementAndGet() % activePipelineChannels.size(); + return pipelineIndex.incrementAndGet() % activePipelines.size(); } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java index 2e56043c6bb..508ca9bd3b6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java @@ -20,7 +20,6 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel; import org.apache.hadoop.hdds.scm.container.placement.algorithms .ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.container.placement.algorithms @@ -85,20 +84,20 @@ public class PipelineSelector { * The first of the list will be the leader node. * @return pipeline corresponding to nodes */ - public static PipelineChannel newPipelineFromNodes( + public static Pipeline newPipelineFromNodes( List nodes, LifeCycleState state, ReplicationType replicationType, ReplicationFactor replicationFactor, String name) { Preconditions.checkNotNull(nodes); Preconditions.checkArgument(nodes.size() > 0); String leaderId = nodes.get(0).getUuidString(); - PipelineChannel - pipelineChannel = new PipelineChannel(leaderId, state, replicationType, + Pipeline + pipeline = new Pipeline(leaderId, state, replicationType, replicationFactor, name); for (DatanodeDetails node : nodes) { - pipelineChannel.addMember(node); + pipeline.addMember(node); } - return pipelineChannel; + return pipeline; } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java index 70489b9253c..ace87582349 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java @@ -20,7 +20,6 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.XceiverClientRatis; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel; import org.apache.hadoop.hdds.scm.container.placement.algorithms .ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.node.NodeManager; @@ -68,12 +67,12 @@ public class RatisManagerImpl extends PipelineManager { } /** - * Allocates a new ratis PipelineChannel from the free nodes. + * Allocates a new ratis Pipeline from the free nodes. * * @param factor - One or Three * @return PipelineChannel. */ - public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) { + public Pipeline allocatePipeline(ReplicationFactor factor) { List newNodesList = new LinkedList<>(); List datanodes = nodeManager.getNodes(NodeState.HEALTHY); int count = getReplicationCount(factor); @@ -87,22 +86,20 @@ public class RatisManagerImpl extends PipelineManager { // once a datanode has been added to a pipeline, exclude it from // further allocations ratisMembers.addAll(newNodesList); - LOG.info("Allocating a new ratis pipelineChannel of size: {}", count); + LOG.info("Allocating a new ratis pipeline of size: {}", count); // Start all channel names with "Ratis", easy to grep the logs. String conduitName = PREFIX + UUID.randomUUID().toString().substring(PREFIX.length()); - PipelineChannel pipelineChannel = + Pipeline pipeline= PipelineSelector.newPipelineFromNodes(newNodesList, LifeCycleState.OPEN, ReplicationType.RATIS, factor, conduitName); - Pipeline pipeline = - new Pipeline(pipelineChannel); try (XceiverClientRatis client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) { client.createPipeline(pipeline.getPipelineName(), newNodesList); } catch (IOException e) { return null; } - return pipelineChannel; + return pipeline; } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java index 82683293512..e76027fb2b4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java @@ -17,7 +17,7 @@ package org.apache.hadoop.hdds.scm.pipelines.standalone; import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.container.placement.algorithms .ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.node.NodeManager; @@ -67,12 +67,12 @@ public class StandaloneManagerImpl extends PipelineManager { /** - * Allocates a new standalone PipelineChannel from the free nodes. + * Allocates a new standalone Pipeline from the free nodes. * * @param factor - One - * @return PipelineChannel. + * @return Pipeline. */ - public PipelineChannel allocatePipelineChannel(ReplicationFactor factor) { + public Pipeline allocatePipeline(ReplicationFactor factor) { List newNodesList = new LinkedList<>(); List datanodes = nodeManager.getNodes(NodeState.HEALTHY); int count = getReplicationCount(factor); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java index adb212a409c..d06d568ae09 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestDeletedBlockLog.java @@ -23,7 +23,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerMapping; import org.apache.hadoop.hdds.scm.container.Mapping; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -357,11 +356,10 @@ public class TestDeletedBlockLog { private void mockContainerInfo(Mapping mappingService, long containerID, DatanodeDetails dd) throws IOException { - PipelineChannel pipelineChannel = - new PipelineChannel("fake", LifeCycleState.OPEN, + Pipeline pipeline = + new Pipeline("fake", LifeCycleState.OPEN, ReplicationType.STAND_ALONE, ReplicationFactor.ONE, "fake"); - pipelineChannel.addMember(dd); - Pipeline pipeline = new Pipeline(pipelineChannel); + pipeline.addMember(dd); ContainerInfo.Builder builder = new ContainerInfo.Builder(); builder.setPipeline(pipeline); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java index 0254984d23f..50cdd548581 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java @@ -28,7 +28,6 @@ import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.XceiverClient; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.TestGenericTestUtils; @@ -92,13 +91,12 @@ public class TestMiniOzoneCluster { for(HddsDatanodeService dn : datanodes) { // Create a single member pipe line DatanodeDetails datanodeDetails = dn.getDatanodeDetails(); - final PipelineChannel pipelineChannel = - new PipelineChannel(datanodeDetails.getUuidString(), + final Pipeline pipeline = + new Pipeline(datanodeDetails.getUuidString(), HddsProtos.LifeCycleState.OPEN, HddsProtos.ReplicationType.STAND_ALONE, HddsProtos.ReplicationFactor.ONE, "test"); - pipelineChannel.addMember(datanodeDetails); - Pipeline pipeline = new Pipeline(pipelineChannel); + pipeline.addMember(datanodeDetails); // Verify client is able to connect to the container try (XceiverClient client = new XceiverClient(pipeline, conf)){ diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 7046132f6f9..459da2ebf86 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.KeyData; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; @@ -136,14 +135,14 @@ public final class ContainerTestHelper { Preconditions.checkArgument(i.hasNext()); final DatanodeDetails leader = i.next(); String pipelineName = "TEST-" + UUID.randomUUID().toString().substring(3); - final PipelineChannel pipelineChannel = - new PipelineChannel(leader.getUuidString(), LifeCycleState.OPEN, + final Pipeline pipeline = + new Pipeline(leader.getUuidString(), LifeCycleState.OPEN, ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName); - pipelineChannel.addMember(leader); + pipeline.addMember(leader); for(; i.hasNext();) { - pipelineChannel.addMember(i.next()); + pipeline.addMember(i.next()); } - return new Pipeline(pipelineChannel); + return pipeline; } /** @@ -207,8 +206,6 @@ public final class ContainerTestHelper { ContainerProtos.WriteChunkRequestProto .newBuilder(); - Pipeline newPipeline = - new Pipeline(pipeline.getPipelineChannel()); writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf()); byte[] data = getData(datalen); @@ -223,7 +220,7 @@ public final class ContainerTestHelper { request.setCmdType(ContainerProtos.Type.WriteChunk); request.setWriteChunk(writeRequest); request.setTraceID(UUID.randomUUID().toString()); - request.setDatanodeUuid(newPipeline.getLeader().getUuidString()); + request.setDatanodeUuid(pipeline.getLeader().getUuidString()); return request.build(); } @@ -241,8 +238,6 @@ public final class ContainerTestHelper { throws Exception { ContainerProtos.PutSmallFileRequestProto.Builder smallFileRequest = ContainerProtos.PutSmallFileRequestProto.newBuilder(); - Pipeline newPipeline = - new Pipeline(pipeline.getPipelineChannel()); byte[] data = getData(dataLen); ChunkInfo info = getChunk(blockID.getLocalID(), 0, 0, dataLen); setDataChecksum(info, data); @@ -266,7 +261,7 @@ public final class ContainerTestHelper { request.setCmdType(ContainerProtos.Type.PutSmallFile); request.setPutSmallFile(smallFileRequest); request.setTraceID(UUID.randomUUID().toString()); - request.setDatanodeUuid(newPipeline.getLeader().getUuidString()); + request.setDatanodeUuid(pipeline.getLeader().getUuidString()); return request.build(); } diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkContainerStateMap.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkContainerStateMap.java index 1b1153b18a1..375450ca09e 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkContainerStateMap.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkContainerStateMap.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel; import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.util.Time; @@ -150,14 +149,14 @@ public class BenchMarkContainerStateMap { Preconditions.checkArgument(i.hasNext()); final DatanodeDetails leader = i.next(); String pipelineName = "TEST-" + UUID.randomUUID().toString().substring(5); - final PipelineChannel pipelineChannel = - new PipelineChannel(leader.getUuidString(), OPEN, + final Pipeline pipeline = + new Pipeline(leader.getUuidString(), OPEN, ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName); - pipelineChannel.addMember(leader); + pipeline.addMember(leader); for (; i.hasNext();) { - pipelineChannel.addMember(i.next()); + pipeline.addMember(i.next()); } - return new Pipeline(pipelineChannel); + return pipeline; } @Benchmark diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java index 13b04c31ee5..3d4426f82b5 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.ozone.genesis; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.FileUtils; @@ -32,7 +33,6 @@ import org.apache.hadoop.ozone.container.common.impl.Dispatcher; import org.apache.hadoop.ozone.container.common.impl.KeyManagerImpl; import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineChannel; import org.apache.hadoop.util.Time; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Level; @@ -78,7 +78,7 @@ public class BenchMarkDatanodeDispatcher { private String baseDir; private String datanodeUuid; private Dispatcher dispatcher; - private PipelineChannel pipelineChannel; + private Pipeline pipeline; private ByteString data; private Random random; private AtomicInteger containerCount; @@ -96,7 +96,7 @@ public class BenchMarkDatanodeDispatcher { @Setup(Level.Trial) public void initialize() throws IOException { datanodeUuid = UUID.randomUUID().toString(); - pipelineChannel = new PipelineChannel("127.0.0.1", + pipeline = new Pipeline("127.0.0.1", LifeCycleState.OPEN, ReplicationType.STAND_ALONE, ReplicationFactor.ONE, "SA-" + UUID.randomUUID()); diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java index d4ac994cff9..2bd43fb93af 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java @@ -519,11 +519,11 @@ public class SQLCLI extends Configured implements Tool { LOG.info("Insert to sql container db, for container {}", containerID); String insertContainerInfo = String.format( INSERT_CONTAINER_INFO, containerID, - pipeline.getPipelineChannel().getLeaderID()); + pipeline.getLeaderID()); executeSQL(conn, insertContainerInfo); for (HddsProtos.DatanodeDetailsProto dd : - pipeline.getPipelineChannel().getMembersList()) { + pipeline.getMembersList()) { String uuid = dd.getUuid(); if (!uuidChecked.contains(uuid)) { // we may also not use this checked set, but catch exception instead