From c141a912f074fc829c95fbd3c168d63f745b7927 Mon Sep 17 00:00:00 2001 From: Nandakumar Date: Sat, 13 Oct 2018 03:22:20 +0530 Subject: [PATCH] HDDS-587. Add new classes for pipeline management. Contributed by Lokesh Jain. (cherry picked from commit 5c8e023ba32da3e65193f6ced354efe830dba75d) --- .../hadoop/hdds/scm/pipeline/Pipeline.java | 211 +++++++++++++++ .../hadoop/hdds/scm/pipeline/PipelineID.java | 80 ++++++ .../hdds/scm/pipeline/package-info.java | 24 ++ .../hdds/scm/pipeline/PipelineFactory.java | 56 ++++ .../hdds/scm/pipeline/PipelineManager.java | 58 +++++ .../hdds/scm/pipeline/PipelineProvider.java | 35 +++ .../scm/pipeline/PipelineStateManager.java | 179 +++++++++++++ .../hdds/scm/pipeline/PipelineStateMap.java | 212 +++++++++++++++ .../scm/pipeline/RatisPipelineProvider.java | 135 ++++++++++ .../hdds/scm/pipeline/SCMPipelineManager.java | 226 ++++++++++++++++ .../scm/pipeline/SimplePipelineProvider.java | 80 ++++++ .../hdds/scm/pipeline/package-info.java | 24 ++ .../pipeline/TestPipelineStateManager.java | 246 ++++++++++++++++++ .../pipeline/TestRatisPipelineProvider.java | 104 ++++++++ .../pipeline/TestSimplePipelineProvider.java | 102 ++++++++ 15 files changed, 1772 insertions(+) create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineID.java create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/package-info.java create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/package-info.java create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java new file mode 100644 index 00000000000..b58a001a329 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Represents a group of datanodes which store a container. + */ +public final class Pipeline { + + private final PipelineID id; + private final ReplicationType type; + private final ReplicationFactor factor; + + private LifeCycleState state; + private List nodes; + + private Pipeline(PipelineID id, ReplicationType type, + ReplicationFactor factor, LifeCycleState state, + List nodes) { + this.id = id; + this.type = type; + this.factor = factor; + this.state = state; + this.nodes = nodes; + } + + /** + * Returns the ID of this pipeline. + * + * @return PipelineID + */ + public PipelineID getID() { + return id; + } + + /** + * Returns the type. + * + * @return type - Simple or Ratis. + */ + public ReplicationType getType() { + return type; + } + + /** + * Returns the factor. + * + * @return type - Simple or Ratis. + */ + public ReplicationFactor getFactor() { + return factor; + } + + /** + * Returns the State of the pipeline. + * + * @return - LifeCycleStates. + */ + public LifeCycleState getLifeCycleState() { + return state; + } + + /** + * Returns the list of nodes which form this pipeline. + * + * @return List of DatanodeDetails + */ + public List getNodes() { + return new ArrayList<>(nodes); + } + + public HddsProtos.Pipeline getProtobufMessage() { + HddsProtos.Pipeline.Builder builder = HddsProtos.Pipeline.newBuilder(); + builder.setId(id.getProtobuf()); + builder.setType(type); + builder.setState(state); + builder.addAllMembers(nodes.stream().map( + DatanodeDetails::getProtoBufMessage).collect(Collectors.toList())); + return builder.build(); + } + + public static Pipeline fromProtobuf(HddsProtos.Pipeline pipeline) { + return new Pipeline(PipelineID.getFromProtobuf(pipeline.getId()), + pipeline.getType(), pipeline.getFactor(), pipeline.getState(), + pipeline.getMembersList().stream().map(DatanodeDetails::getFromProtoBuf) + .collect(Collectors.toList())); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + Pipeline that = (Pipeline) o; + + return new EqualsBuilder() + .append(id, that.id) + .append(type, that.type) + .append(factor, that.factor) + .append(state, that.state) + .append(nodes, that.nodes) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(id) + .append(type) + .append(factor) + .append(state) + .append(nodes) + .toHashCode(); + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static Builder newBuilder(Pipeline pipeline) { + return new Builder(pipeline); + } + + /** + * Builder class for Pipeline. + */ + public static class Builder { + private PipelineID id = null; + private ReplicationType type = null; + private ReplicationFactor factor = null; + private LifeCycleState state = null; + private List nodes = null; + + public Builder() {} + + public Builder(Pipeline pipeline) { + this.id = pipeline.getID(); + this.type = pipeline.getType(); + this.factor = pipeline.getFactor(); + this.state = pipeline.getLifeCycleState(); + this.nodes = pipeline.getNodes(); + } + + public Builder setId(PipelineID id1) { + this.id = id1; + return this; + } + + public Builder setType(ReplicationType type1) { + this.type = type1; + return this; + } + + public Builder setFactor(ReplicationFactor factor1) { + this.factor = factor1; + return this; + } + + public Builder setState(LifeCycleState state1) { + this.state = state1; + return this; + } + + public Builder setNodes(List nodes1) { + this.nodes = nodes1; + return this; + } + + public Pipeline build() { + Preconditions.checkNotNull(id); + Preconditions.checkNotNull(type); + Preconditions.checkNotNull(factor); + Preconditions.checkNotNull(state); + Preconditions.checkNotNull(nodes); + return new Pipeline(id, type, factor, state, nodes); + } + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineID.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineID.java new file mode 100644 index 00000000000..76cf55e8b12 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineID.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; + +import java.util.UUID; + +/** + * ID for the pipeline, the ID is based on UUID. + */ +public final class PipelineID { + + private UUID id; + + private PipelineID(UUID id) { + this.id = id; + } + + public static PipelineID randomId() { + return new PipelineID(UUID.randomUUID()); + } + + public static PipelineID valueOf(UUID id) { + return new PipelineID(id); + } + + public UUID getId() { + return id; + } + + public HddsProtos.PipelineID getProtobuf() { + return HddsProtos.PipelineID.newBuilder().setId(id.toString()).build(); + } + + public static PipelineID getFromProtobuf(HddsProtos.PipelineID protos) { + return new PipelineID(UUID.fromString(protos.getId())); + } + + @Override + public String toString() { + return "PipelineID=" + id; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + PipelineID that = (PipelineID) o; + + return id.equals(that.id); + } + + @Override + public int hashCode() { + return id.hashCode(); + } + +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/package-info.java new file mode 100644 index 00000000000..51adc888661 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.pipeline; +/** + Ozone supports the notion of different kind of pipelines. + That means that we can have a replication pipeline build on + Ratis, Simple or some other protocol. All Pipeline managers + the entities in charge of pipelines reside in the package. + */ \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java new file mode 100644 index 00000000000..0265ff2a693 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.node.NodeManager; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Creates pipeline based on replication type. + */ +public final class PipelineFactory { + + private Map providers; + + PipelineFactory(NodeManager nodeManager, + PipelineStateManager stateManager) { + providers = new HashMap<>(); + providers.put(ReplicationType.STAND_ALONE, + new SimplePipelineProvider(nodeManager)); + providers.put(ReplicationType.RATIS, + new RatisPipelineProvider(nodeManager, stateManager)); + } + + public Pipeline create(ReplicationType type, ReplicationFactor factor) + throws IOException { + return providers.get(type).create(factor); + } + + public Pipeline create(ReplicationType type, List nodes) + throws IOException { + return providers.get(type).create(nodes); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java new file mode 100644 index 00000000000..2d8cae37975 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.container.ContainerID; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Set; + +/** + * Interface which exposes the api for pipeline management. + */ +public interface PipelineManager extends Closeable { + + Pipeline createPipeline(ReplicationType type, ReplicationFactor factor) + throws IOException; + + Pipeline createPipeline(ReplicationType type, List nodes) + throws IOException; + + Pipeline getPipeline(PipelineID pipelineID) throws IOException; + + void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID) + throws IOException; + + void removeContainerFromPipeline(PipelineID pipelineID, ContainerID containerID) + throws IOException; + + Set getContainersInPipeline(PipelineID pipelineID) + throws IOException; + + void finalizePipeline(PipelineID pipelineID) throws IOException; + + void closePipeline(PipelineID pipelineId) throws IOException; + + void removePipeline(PipelineID pipelineID) throws IOException; +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java new file mode 100644 index 00000000000..2fc2e0eaf11 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; + +import java.io.IOException; +import java.util.List; + +/** + * Interface for creating pipelines. + */ +public interface PipelineProvider { + + Pipeline create(ReplicationFactor factor) throws IOException; + + Pipeline create(List nodes) throws IOException; +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java new file mode 100644 index 00000000000..9752b5a98bf --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException; +import org.apache.hadoop.ozone.common.statemachine.StateMachine; +import org.apache.hadoop.ozone.lease.LeaseManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_PIPELINE_STATE; + +/** + * Manages the state of pipelines in SCM. All write operations like pipeline + * creation, removal and updates should come via SCMPipelineManager. + * PipelineStateMap class holds the data structures related to pipeline and its + * state. All the read and write operations in PipelineStateMap are protected + * by a read write lock. + */ +class PipelineStateManager { + + private static final Logger LOG = LoggerFactory.getLogger( + org.apache.hadoop.hdds.scm.pipelines.PipelineStateManager.class); + + private final PipelineStateMap pipelineStateMap; + private final StateMachine stateMachine; + private final LeaseManager pipelineLeaseManager; + + PipelineStateManager(Configuration conf) { + this.pipelineStateMap = new PipelineStateMap(); + Set finalStates = new HashSet<>(); + long pipelineCreationLeaseTimeout = conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT, + ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + // TODO: Use LeaseManager for creation of pipelines. + // Add pipeline initialization logic. + this.pipelineLeaseManager = new LeaseManager<>("PipelineCreation", + pipelineCreationLeaseTimeout); + this.pipelineLeaseManager.start(); + + finalStates.add(LifeCycleState.CLOSED); + this.stateMachine = new StateMachine<>(LifeCycleState.ALLOCATED, + finalStates); + initializeStateMachine(); + } + + + /* + * Event and State Transition Mapping. + * + * State: ALLOCATED ---------------> CREATING + * Event: CREATE + * + * State: CREATING ---------------> OPEN + * Event: CREATED + * + * State: OPEN ---------------> CLOSING + * Event: FINALIZE + * + * State: CLOSING ---------------> CLOSED + * Event: CLOSE + * + * State: CREATING ---------------> CLOSED + * Event: TIMEOUT + * + * + * Container State Flow: + * + * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING] + * (CREATE) | (CREATED) (FINALIZE) | + * | | + * | | + * |(TIMEOUT) |(CLOSE) + * | | + * +--------> [CLOSED] <--------+ + */ + + /** + * Add javadoc. + */ + private void initializeStateMachine() { + stateMachine.addTransition(LifeCycleState.ALLOCATED, + LifeCycleState.CREATING, LifeCycleEvent.CREATE); + + stateMachine.addTransition(LifeCycleState.CREATING, + LifeCycleState.OPEN, LifeCycleEvent.CREATED); + + stateMachine.addTransition(LifeCycleState.OPEN, + LifeCycleState.CLOSING, LifeCycleEvent.FINALIZE); + + stateMachine.addTransition(LifeCycleState.CLOSING, + LifeCycleState.CLOSED, LifeCycleEvent.CLOSE); + + stateMachine.addTransition(LifeCycleState.CREATING, + LifeCycleState.CLOSED, LifeCycleEvent.TIMEOUT); + } + + Pipeline updatePipelineState(PipelineID pipelineID, LifeCycleEvent event) + throws IOException { + Pipeline pipeline = null; + try { + pipeline = pipelineStateMap.getPipeline(pipelineID); + LifeCycleState newState = + stateMachine.getNextState(pipeline.getLifeCycleState(), event); + return pipelineStateMap.updatePipelineState(pipeline.getID(), newState); + } catch (InvalidStateTransitionException ex) { + String error = String.format("Failed to update pipeline state %s, " + + "reason: invalid state transition from state: %s upon " + + "event: %s.", pipeline.getID(), pipeline.getLifeCycleState(), + event); + LOG.error(error); + throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE); + } + } + + void addPipeline(Pipeline pipeline) throws IOException { + pipelineStateMap.addPipeline(pipeline); + } + + void addContainerToPipeline(PipelineID pipelineId, ContainerID containerID) + throws IOException { + pipelineStateMap.addContainerToPipeline(pipelineId, containerID); + } + + Pipeline getPipeline(PipelineID pipelineID) throws IOException { + return pipelineStateMap.getPipeline(pipelineID); + } + + List getPipelines(HddsProtos.ReplicationType type) { + return pipelineStateMap.getPipelines(type); + } + + Set getContainers(PipelineID pipelineID) throws IOException { + return pipelineStateMap.getContainers(pipelineID); + } + + void removePipeline(PipelineID pipelineID) throws IOException { + pipelineStateMap.removePipeline(pipelineID); + } + + void removeContainerFromPipeline(PipelineID pipelineID, + ContainerID containerID) throws IOException { + pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID); + } + + void close() { + pipelineLeaseManager.shutdown(); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java new file mode 100644 index 00000000000..e3f2393c407 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.pipeline; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; + +/** + * Holds the data structures which maintain the information about pipeline and + * its state. All the read write operations in this class are protected by a + * lock. + * Invariant: If a pipeline exists in PipelineStateMap, both pipelineMap and + * pipeline2container would have a non-null mapping for it. + */ +class PipelineStateMap { + + private static final Logger LOG = LoggerFactory.getLogger( + PipelineStateMap.class); + + private final Map pipelineMap; + private final Map> pipeline2container; + + PipelineStateMap() { + + this.pipelineMap = new HashMap<>(); + this.pipeline2container = new HashMap<>(); + + } + + /** + * Adds provided pipeline in the data structures. + * + * @param pipeline - Pipeline to add + * @throws IOException if pipeline with provided pipelineID already exists + */ + void addPipeline(Pipeline pipeline) throws IOException { + Preconditions.checkNotNull(pipeline, "Pipeline cannot be null"); + Preconditions.checkArgument( + pipeline.getNodes().size() == pipeline.getFactor().getNumber(), + String.format("Nodes size=%d, replication factor=%d do not match ", + pipeline.getNodes().size(), pipeline.getFactor().getNumber())); + + if (pipelineMap.putIfAbsent(pipeline.getID(), pipeline) != null) { + LOG.warn("Duplicate pipeline ID detected. {}", pipeline.getID()); + throw new IOException(String + .format("Duplicate pipeline ID %s detected.", pipeline.getID())); + } + pipeline2container.put(pipeline.getID(), new TreeSet<>()); + } + + /** + * Add container to an existing pipeline. + * + * @param pipelineID - PipelineID of the pipeline to which container is added + * @param containerID - ContainerID of the container to add + * @throws IOException if pipeline is not in open state or does not exist + */ + void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID) + throws IOException { + Preconditions.checkNotNull(pipelineID, + "Pipeline Id cannot be null"); + Preconditions.checkNotNull(containerID, + "container Id cannot be null"); + + Pipeline pipeline = getPipeline(pipelineID); + // TODO: verify the state we need the pipeline to be in + if (!isOpen(pipeline)) { + throw new IOException( + String.format("%s is not in open state", pipelineID)); + } + pipeline2container.get(pipelineID).add(containerID); + } + + /** + * Get pipeline corresponding to specified pipelineID. + * + * @param pipelineID - PipelineID of the pipeline to be retrieved + * @return Pipeline + * @throws IOException if pipeline is not found + */ + Pipeline getPipeline(PipelineID pipelineID) throws IOException { + Pipeline pipeline = pipelineMap.get(pipelineID); + if (pipeline == null) { + throw new IOException(String.format("%s not found", pipelineID)); + } + return pipeline; + } + + /** + * Get pipeline corresponding to specified replication type. + * + * @param type - ReplicationType + * @return List of pipelines which have the specified replication type + */ + List getPipelines(ReplicationType type) { + Preconditions.checkNotNull(type, "Replication type cannot be null"); + + return pipelineMap.values().stream().filter(p -> p.getType().equals(type)) + .collect(Collectors.toList()); + } + + /** + * Get set of containers corresponding to a pipeline. + * + * @param pipelineID - PipelineID + * @return Set of Containers belonging to the pipeline + * @throws IOException if pipeline is not found + */ + Set getContainers(PipelineID pipelineID) + throws IOException { + Set containerIDs = pipeline2container.get(pipelineID); + if (containerIDs == null) { + throw new IOException(String.format("%s not found", pipelineID)); + } + return new HashSet<>(containerIDs); + } + + /** + * Remove pipeline from the data structures. + * + * @param pipelineID - PipelineID of the pipeline to be removed + * @throws IOException if the pipeline is not empty or does not exist + */ + void removePipeline(PipelineID pipelineID) throws IOException { + Preconditions.checkNotNull(pipelineID, "Pipeline Id cannot be null"); + + //TODO: Add a flag which suppresses exception if pipeline does not exist? + Set containerIDs = getContainers(pipelineID); + if (containerIDs.size() != 0) { + throw new IOException( + String.format("Pipeline with %s is not empty", pipelineID)); + } + pipelineMap.remove(pipelineID); + pipeline2container.remove(pipelineID); + } + + /** + * Remove container from a pipeline. + * + * @param pipelineID - PipelineID of the pipeline from which container needs + * to be removed + * @param containerID - ContainerID of the container to remove + * @throws IOException if pipeline does not exist + */ + void removeContainerFromPipeline(PipelineID pipelineID, + ContainerID containerID) throws IOException { + Preconditions.checkNotNull(pipelineID, + "Pipeline Id cannot be null"); + Preconditions.checkNotNull(containerID, + "container Id cannot be null"); + + Pipeline pipeline = getPipeline(pipelineID); + Set containerIDs = pipeline2container.get(pipelineID); + containerIDs.remove(containerID); + if (containerIDs.size() == 0 && isClosingOrClosed(pipeline)) { + removePipeline(pipelineID); + } + } + + /** + * Updates the state of pipeline. + * + * @param pipelineID - PipelineID of the pipeline whose state needs + * to be updated + * @param state - new state of the pipeline + * @return Pipeline with the updated state + * @throws IOException if pipeline does not exist + */ + Pipeline updatePipelineState(PipelineID pipelineID, LifeCycleState state) + throws IOException { + Preconditions.checkNotNull(pipelineID, "Pipeline Id cannot be null"); + Preconditions.checkNotNull(state, "Pipeline LifeCycleState cannot be null"); + + Pipeline pipeline = getPipeline(pipelineID); + pipeline = pipelineMap + .put(pipelineID, Pipeline.newBuilder(pipeline).setState(state).build()); + // TODO: Verify if need to throw exception for non-existent pipeline + return pipeline; + } + + private boolean isClosingOrClosed(Pipeline pipeline) { + LifeCycleState state = pipeline.getLifeCycleState(); + return state == LifeCycleState.CLOSING || state == LifeCycleState.CLOSED; + } + + private boolean isOpen(Pipeline pipeline) { + return pipeline.getLifeCycleState() == LifeCycleState.OPEN; + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java new file mode 100644 index 00000000000..b3bed334fb6 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom; +import org.apache.hadoop.hdds.scm.node.NodeManager; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Implements Api for creating ratis pipelines. + */ +public class RatisPipelineProvider implements PipelineProvider { + + private final NodeManager nodeManager; + private final PipelineStateManager stateManager; + + RatisPipelineProvider(NodeManager nodeManager, + PipelineStateManager stateManager) { + this.nodeManager = nodeManager; + this.stateManager = stateManager; + } + + /** + * Create pluggable container placement policy implementation instance. + * + * @param nodeManager - SCM node manager. + * @param conf - configuration. + * @return SCM container placement policy implementation instance. + */ + @SuppressWarnings("unchecked") + // TODO: should we rename ContainerPlacementPolicy to PipelinePlacementPolicy? + private static ContainerPlacementPolicy createContainerPlacementPolicy( + final NodeManager nodeManager, final Configuration conf) { + Class implClass = + (Class) conf.getClass( + ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, + SCMContainerPlacementRandom.class); + + try { + Constructor ctor = + implClass.getDeclaredConstructor(NodeManager.class, + Configuration.class); + return ctor.newInstance(nodeManager, conf); + } catch (RuntimeException e) { + throw e; + } catch (InvocationTargetException e) { + throw new RuntimeException(implClass.getName() + + " could not be constructed.", e.getCause()); + } catch (Exception e) { +// LOG.error("Unhandled exception occurred, Placement policy will not " + +// "be functional."); + throw new IllegalArgumentException("Unable to load " + + "ContainerPlacementPolicy", e); + } + } + + + @Override + public Pipeline create(ReplicationFactor factor) throws IOException { + // Get set of datanodes already used for ratis pipeline + Set dnsUsed = new HashSet<>(); + stateManager.getPipelines(ReplicationType.RATIS) + .forEach(p -> dnsUsed.addAll(p.getNodes())); + + // Get list of healthy nodes + List dns = + nodeManager.getNodes(NodeState.HEALTHY) + .parallelStream() + .filter(dn -> !dnsUsed.contains(dn)) + .limit(factor.getNumber()) + .collect(Collectors.toList()); + if (dns.size() < factor.getNumber()) { + String e = String + .format("Cannot create pipeline of factor %d using %d nodes.", + factor.getNumber(), dns.size()); + throw new IOException(e); + } + + return Pipeline.newBuilder() + .setId(PipelineID.randomId()) + .setState(LifeCycleState.ALLOCATED) + .setType(ReplicationType.RATIS) + .setFactor(factor) + .setNodes(dns) + .build(); + } + + @Override + public Pipeline create(List nodes) throws IOException { + ReplicationFactor factor = ReplicationFactor.valueOf(nodes.size()); + if (factor == null) { + throw new IOException(String + .format("Nodes size=%d does not match any replication factor", + nodes.size())); + } + return Pipeline.newBuilder() + .setId(PipelineID.randomId()) + .setState(LifeCycleState.ALLOCATED) + .setType(ReplicationType.RATIS) + .setFactor(factor) + .setNodes(nodes) + .build(); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java new file mode 100644 index 00000000000..3ee58495228 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.hadoop.hdds.scm + .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT; +import static org.apache.hadoop.hdds.scm + .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB; +import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath; +import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB; + +/** + * Implements api needed for management of pipelines. All the write operations + * for pipelines must come via PipelineManager. It synchronises all write + * and read operations via a ReadWriteLock. + */ +public class SCMPipelineManager implements PipelineManager { + + private static final Logger LOG = + LoggerFactory.getLogger(SCMPipelineManager.class); + + private final ReadWriteLock lock; + private final PipelineFactory pipelineFactory; + private final PipelineStateManager stateManager; + private final MetadataStore pipelineStore; + + public SCMPipelineManager(Configuration conf, NodeManager nodeManager) + throws IOException { + this.lock = new ReentrantReadWriteLock(); + this.stateManager = new PipelineStateManager(conf); + this.pipelineFactory = new PipelineFactory(nodeManager, stateManager); + int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, + OZONE_SCM_DB_CACHE_SIZE_DEFAULT); + File metaDir = getOzoneMetaDirPath(conf); + File pipelineDBPath = new File(metaDir, SCM_PIPELINE_DB); + this.pipelineStore = + MetadataStoreBuilder.newBuilder() + .setConf(conf) + .setDbFile(pipelineDBPath) + .setCacheSize(cacheSize * OzoneConsts.MB) + .build(); + + initializePipelineState(); + } + + private void initializePipelineState() throws IOException { + if (pipelineStore.isEmpty()) { + LOG.info("No pipeline exists in current db"); + return; + } + List> pipelines = + pipelineStore.getSequentialRangeKVs(null, Integer.MAX_VALUE, null); + + for (Map.Entry entry : pipelines) { + Pipeline pipeline = Pipeline + .fromProtobuf(HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue())); + Preconditions.checkNotNull(pipeline); + stateManager.addPipeline(pipeline); + } + } + + @Override + public synchronized Pipeline createPipeline( + ReplicationType type, ReplicationFactor factor) throws IOException { + lock.writeLock().lock(); + try { + Pipeline pipeline = pipelineFactory.create(type, factor); + stateManager.addPipeline(pipeline); + try { + pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(), + pipeline.getProtobufMessage().toByteArray()); + } catch (IOException ioe) { + // if db operation fails we need to revert the pipeline creation in + // state manager. + stateManager.removePipeline(pipeline.getID()); + throw ioe; + } + return pipeline; + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public Pipeline createPipeline(ReplicationType type, + List nodes) + throws IOException { + // This will mostly be used to create dummy pipeline for SimplePipelines. + lock.writeLock().lock(); + try { + return pipelineFactory.create(type, nodes); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public Pipeline getPipeline(PipelineID pipelineID) throws IOException { + lock.readLock().lock(); + try { + return stateManager.getPipeline(pipelineID); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public void addContainerToPipeline(PipelineID pipelineID, + ContainerID containerID) throws IOException { + lock.writeLock().lock(); + try { + stateManager.addContainerToPipeline(pipelineID, containerID); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public void removeContainerFromPipeline(PipelineID pipelineID, + ContainerID containerID) throws IOException { + lock.writeLock().lock(); + try { + stateManager.removeContainerFromPipeline(pipelineID, containerID); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public Set getContainersInPipeline(PipelineID pipelineID) + throws IOException { + lock.readLock().lock(); + try { + return stateManager.getContainers(pipelineID); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public void finalizePipeline(PipelineID pipelineId) throws IOException { + lock.writeLock().lock(); + try { + //TODO: close all containers in this pipeline + Pipeline pipeline = + stateManager.updatePipelineState(pipelineId, LifeCycleEvent.FINALIZE); + pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(), + pipeline.getProtobufMessage().toByteArray()); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public void closePipeline(PipelineID pipelineId) throws IOException { + lock.writeLock().lock(); + try { + Pipeline pipeline = + stateManager.updatePipelineState(pipelineId, LifeCycleEvent.CLOSE); + pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(), + pipeline.getProtobufMessage().toByteArray()); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public void removePipeline(PipelineID pipelineID) throws IOException { + lock.writeLock().lock(); + try { + stateManager.removePipeline(pipelineID); + pipelineStore.delete(pipelineID.getProtobuf().toByteArray()); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public void close() throws IOException { + lock.writeLock().lock(); + try { + stateManager.close(); + } finally { + lock.writeLock().unlock(); + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java new file mode 100644 index 00000000000..56ffcd0e2df --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.scm.node.NodeManager; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * Implements Api for creating stand alone pipelines. + */ +public class SimplePipelineProvider implements PipelineProvider { + + private final NodeManager nodeManager; + + public SimplePipelineProvider(NodeManager nodeManager) { + this.nodeManager = nodeManager; + } + + @Override + public Pipeline create(ReplicationFactor factor) throws IOException { + List dns = + nodeManager.getNodes(NodeState.HEALTHY); + if (dns.size() < factor.getNumber()) { + String e = String + .format("Cannot create pipeline of factor %d using %d nodes.", + factor.getNumber(), dns.size()); + throw new IOException(e); + } + + Collections.shuffle(dns); + return Pipeline.newBuilder() + .setId(PipelineID.randomId()) + .setState(LifeCycleState.ALLOCATED) + .setType(ReplicationType.STAND_ALONE) + .setFactor(factor) + .setNodes(dns.subList(0, factor.getNumber())) + .build(); + } + + @Override + public Pipeline create(List nodes) throws IOException { + ReplicationFactor factor = ReplicationFactor.valueOf(nodes.size()); + if (factor == null) { + throw new IOException(String + .format("Nodes size=%d does not match any replication factor", + nodes.size())); + } + return Pipeline.newBuilder() + .setId(PipelineID.randomId()) + .setState(LifeCycleState.ALLOCATED) + .setType(ReplicationType.STAND_ALONE) + .setFactor(factor) + .setNodes(nodes) + .build(); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/package-info.java new file mode 100644 index 00000000000..51adc888661 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.pipeline; +/** + Ozone supports the notion of different kind of pipelines. + That means that we can have a replication pipeline build on + Ratis, Simple or some other protocol. All Pipeline managers + the entities in charge of pipelines reside in the package. + */ \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java new file mode 100644 index 00000000000..0d4c461b575 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Test for PipelineStateManager. + */ +public class TestPipelineStateManager { + + private PipelineStateManager stateManager; + + @Before + public void init() throws Exception { + Configuration conf = new OzoneConfiguration(); + stateManager = new PipelineStateManager(conf); + } + + private Pipeline createDummyPipeline(int numNodes) { + List nodes = new ArrayList<>(); + for (int i = 0; i < numNodes; i++) { + nodes.add(TestUtils.randomDatanodeDetails()); + } + return Pipeline.newBuilder() + .setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.ONE) + .setNodes(nodes) + .setState(HddsProtos.LifeCycleState.ALLOCATED) + .setId(PipelineID.randomId()) + .build(); + } + + @Test + public void testAddAndGetPipeline() throws IOException { + Pipeline pipeline = createDummyPipeline(0); + try { + stateManager.addPipeline(pipeline); + Assert.fail("Pipeline should not have been added"); + } catch (IllegalArgumentException e) { + // replication factor and number of nodes in the pipeline do not match + Assert.assertTrue(e.getMessage().contains("do not match")); + } + + // add a pipeline + pipeline = createDummyPipeline(1); + stateManager.addPipeline(pipeline); + + try { + stateManager.addPipeline(pipeline); + Assert.fail("Pipeline should not have been added"); + } catch (IOException e) { + // Can not add a pipeline twice + Assert.assertTrue(e.getMessage().contains("Duplicate pipeline ID")); + } + + // verify pipeline returned is same + Pipeline pipeline1 = stateManager.getPipeline(pipeline.getID()); + Assert.assertTrue(pipeline == pipeline1); + + // clean up + stateManager.removePipeline(pipeline1.getID()); + } + + @Test + public void testGetPipelines() throws IOException { + Set pipelines = new HashSet<>(); + Pipeline pipeline = createDummyPipeline(1); + pipelines.add(pipeline); + stateManager.addPipeline(pipeline); + pipeline = createDummyPipeline(1); + pipelines.add(pipeline); + stateManager.addPipeline(pipeline); + + Set pipelines1 = new HashSet<>(stateManager.getPipelines( + HddsProtos.ReplicationType.RATIS)); + Assert.assertEquals(pipelines, pipelines1); + } + + @Test + public void testAddAndGetContainer() throws IOException { + long containerID = 0; + Pipeline pipeline = createDummyPipeline(1); + stateManager.addPipeline(pipeline); + pipeline = stateManager.getPipeline(pipeline.getID()); + + try { + stateManager + .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID)); + Assert.fail("Container should not have been added"); + } catch (IOException e) { + // add container possible only in container with open state + Assert.assertTrue(e.getMessage().contains("is not in open state")); + } + + // move pipeline to open state + updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE, + HddsProtos.LifeCycleEvent.CREATED); + + // add three containers + stateManager + .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(containerID)); + stateManager + .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID)); + stateManager + .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID)); + + //verify the number of containers returned + Set containerIDs = + stateManager.getContainers(pipeline.getID()); + Assert.assertEquals(containerIDs.size(), containerID); + + removePipeline(pipeline); + try { + stateManager + .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID)); + Assert.fail("Container should not have been added"); + } catch (IOException e) { + // Can not add a container to removed pipeline + Assert.assertTrue(e.getMessage().contains("not found")); + } + } + + @Test + public void testRemovePipeline() throws IOException { + Pipeline pipeline = createDummyPipeline(1); + stateManager.addPipeline(pipeline); + updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE, + HddsProtos.LifeCycleEvent.CREATED); + stateManager + .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(1)); + + try { + stateManager.removePipeline(pipeline.getID()); + Assert.fail("Pipeline should not have been removed"); + } catch (IOException e) { + // can not remove a pipeline which already has containers + Assert.assertTrue(e.getMessage().contains("not empty")); + } + + // remove containers and then remove the pipeline + removePipeline(pipeline); + } + + @Test + public void testRemoveContainer() throws IOException { + long containerID = 1; + Pipeline pipeline = createDummyPipeline(1); + // create an open pipeline in stateMap + stateManager.addPipeline(pipeline); + updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE, + HddsProtos.LifeCycleEvent.CREATED); + + stateManager + .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(containerID)); + stateManager + .removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(containerID)); + // removeContainerFromPipeline in open pipeline does not lead to removal of pipeline + Assert.assertNotNull(stateManager.getPipeline(pipeline.getID())); + + // add two containers in the pipeline + stateManager + .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID)); + stateManager + .addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID)); + + // move pipeline to closing state + updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.FINALIZE); + + stateManager + .removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(containerID)); + // removal of second last container in closing or closed pipeline should + // not lead to removal of pipeline + Assert.assertNotNull(stateManager.getPipeline(pipeline.getID())); + stateManager + .removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(--containerID)); + // removal of last container in closing or closed pipeline should lead to + // removal of pipeline + try { + stateManager.getPipeline(pipeline.getID()); + Assert.fail("getPipeline should have failed."); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains(" not found")); + } + } + + @Test + public void testUpdatePipelineState() throws IOException { + Pipeline pipeline = createDummyPipeline(1); + stateManager.addPipeline(pipeline); + updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE, + HddsProtos.LifeCycleEvent.CREATED, HddsProtos.LifeCycleEvent.FINALIZE, + HddsProtos.LifeCycleEvent.CLOSE); + + pipeline = createDummyPipeline(1); + stateManager.addPipeline(pipeline); + updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE, + HddsProtos.LifeCycleEvent.TIMEOUT); + } + + private void updateEvents(PipelineID pipelineID, + HddsProtos.LifeCycleEvent... events) throws IOException { + for (HddsProtos.LifeCycleEvent event : events) { + stateManager.updatePipelineState(pipelineID, event); + } + } + + private void removePipeline(Pipeline pipeline) throws IOException { + Set containerIDs = + stateManager.getContainers(pipeline.getID()); + for (ContainerID containerID : containerIDs) { + stateManager.removeContainerFromPipeline(pipeline.getID(), containerID); + } + stateManager.removePipeline(pipeline.getID()); + } +} \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java new file mode 100644 index 00000000000..6cf3e6269fc --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Test for RatisPipelineProvider. + */ +public class TestRatisPipelineProvider { + + private NodeManager nodeManager; + private PipelineProvider provider; + private PipelineStateManager stateManager; + + @Before + public void init() throws Exception { + nodeManager = new MockNodeManager(true, 10); + stateManager = new PipelineStateManager(new OzoneConfiguration()); + provider = new RatisPipelineProvider(nodeManager, + stateManager); + } + + @Test + public void testCreatePipelineWithFactor() throws IOException { + HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE; + Pipeline pipeline = provider.create(factor); + stateManager.addPipeline(pipeline); + Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); + Assert.assertEquals(pipeline.getFactor(), factor); + Assert.assertEquals(pipeline.getLifeCycleState(), + HddsProtos.LifeCycleState.ALLOCATED); + Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); + + factor = HddsProtos.ReplicationFactor.ONE; + Pipeline pipeline1 = provider.create(factor); + stateManager.addPipeline(pipeline1); + // New pipeline should not overlap with the previous created pipeline + Assert.assertTrue( + CollectionUtils.intersection(pipeline.getNodes(), pipeline1.getNodes()) + .isEmpty()); + Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS); + Assert.assertEquals(pipeline1.getFactor(), factor); + Assert.assertEquals(pipeline1.getLifeCycleState(), + HddsProtos.LifeCycleState.ALLOCATED); + Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber()); + } + + private List createListOfNodes(int nodeCount) { + List nodes = new ArrayList<>(); + for (int i = 0; i < nodeCount; i++) { + nodes.add(TestUtils.randomDatanodeDetails()); + } + return nodes; + } + + @Test + public void testCreatePipelineWithNodes() throws IOException { + HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE; + Pipeline pipeline = provider.create(createListOfNodes(factor.getNumber())); + Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); + Assert.assertEquals(pipeline.getFactor(), factor); + Assert.assertEquals(pipeline.getLifeCycleState(), + HddsProtos.LifeCycleState.ALLOCATED); + Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); + + factor = HddsProtos.ReplicationFactor.ONE; + pipeline = provider.create(createListOfNodes(factor.getNumber())); + Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS); + Assert.assertEquals(pipeline.getFactor(), factor); + Assert.assertEquals(pipeline.getLifeCycleState(), + HddsProtos.LifeCycleState.ALLOCATED); + Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); + } +} \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java new file mode 100644 index 00000000000..0f56cc83d87 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Test for SimplePipelineProvider. + */ +public class TestSimplePipelineProvider { + + private NodeManager nodeManager; + private PipelineProvider provider; + private PipelineStateManager stateManager; + + @Before + public void init() throws Exception { + nodeManager = new MockNodeManager(true, 10); + stateManager = new PipelineStateManager(new OzoneConfiguration()); + provider = new SimplePipelineProvider(nodeManager); + } + + @Test + public void testCreatePipelineWithFactor() throws IOException { + HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE; + Pipeline pipeline = provider.create(factor); + stateManager.addPipeline(pipeline); + Assert.assertEquals(pipeline.getType(), + HddsProtos.ReplicationType.STAND_ALONE); + Assert.assertEquals(pipeline.getFactor(), factor); + Assert.assertEquals(pipeline.getLifeCycleState(), + HddsProtos.LifeCycleState.ALLOCATED); + Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); + + factor = HddsProtos.ReplicationFactor.ONE; + Pipeline pipeline1 = provider.create(factor); + stateManager.addPipeline(pipeline1); + Assert.assertEquals(pipeline1.getType(), + HddsProtos.ReplicationType.STAND_ALONE); + Assert.assertEquals(pipeline1.getFactor(), factor); + Assert.assertEquals(pipeline1.getLifeCycleState(), + HddsProtos.LifeCycleState.ALLOCATED); + Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber()); + } + + private List createListOfNodes(int nodeCount) { + List nodes = new ArrayList<>(); + for (int i = 0; i < nodeCount; i++) { + nodes.add(TestUtils.randomDatanodeDetails()); + } + return nodes; + } + + @Test + public void testCreatePipelineWithNodes() throws IOException { + HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE; + Pipeline pipeline = provider.create(createListOfNodes(factor.getNumber())); + Assert.assertEquals(pipeline.getType(), + HddsProtos.ReplicationType.STAND_ALONE); + Assert.assertEquals(pipeline.getFactor(), factor); + Assert.assertEquals(pipeline.getLifeCycleState(), + HddsProtos.LifeCycleState.ALLOCATED); + Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); + + factor = HddsProtos.ReplicationFactor.ONE; + pipeline = provider.create(createListOfNodes(factor.getNumber())); + Assert.assertEquals(pipeline.getType(), + HddsProtos.ReplicationType.STAND_ALONE); + Assert.assertEquals(pipeline.getFactor(), factor); + Assert.assertEquals(pipeline.getLifeCycleState(), + HddsProtos.LifeCycleState.ALLOCATED); + Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber()); + } +} \ No newline at end of file