HDDS-587. Add new classes for pipeline management. Contributed by Lokesh Jain.

(cherry picked from commit 5c8e023ba3)
This commit is contained in:
Nandakumar 2018-10-13 03:22:20 +05:30
parent 9aa6c7b2ad
commit c141a912f0
15 changed files with 1772 additions and 0 deletions

View File

@ -0,0 +1,211 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* Represents a group of datanodes which store a container.
*/
public final class Pipeline {
private final PipelineID id;
private final ReplicationType type;
private final ReplicationFactor factor;
private LifeCycleState state;
private List<DatanodeDetails> nodes;
private Pipeline(PipelineID id, ReplicationType type,
ReplicationFactor factor, LifeCycleState state,
List<DatanodeDetails> nodes) {
this.id = id;
this.type = type;
this.factor = factor;
this.state = state;
this.nodes = nodes;
}
/**
* Returns the ID of this pipeline.
*
* @return PipelineID
*/
public PipelineID getID() {
return id;
}
/**
* Returns the type.
*
* @return type - Simple or Ratis.
*/
public ReplicationType getType() {
return type;
}
/**
* Returns the factor.
*
* @return type - Simple or Ratis.
*/
public ReplicationFactor getFactor() {
return factor;
}
/**
* Returns the State of the pipeline.
*
* @return - LifeCycleStates.
*/
public LifeCycleState getLifeCycleState() {
return state;
}
/**
* Returns the list of nodes which form this pipeline.
*
* @return List of DatanodeDetails
*/
public List<DatanodeDetails> getNodes() {
return new ArrayList<>(nodes);
}
public HddsProtos.Pipeline getProtobufMessage() {
HddsProtos.Pipeline.Builder builder = HddsProtos.Pipeline.newBuilder();
builder.setId(id.getProtobuf());
builder.setType(type);
builder.setState(state);
builder.addAllMembers(nodes.stream().map(
DatanodeDetails::getProtoBufMessage).collect(Collectors.toList()));
return builder.build();
}
public static Pipeline fromProtobuf(HddsProtos.Pipeline pipeline) {
return new Pipeline(PipelineID.getFromProtobuf(pipeline.getId()),
pipeline.getType(), pipeline.getFactor(), pipeline.getState(),
pipeline.getMembersList().stream().map(DatanodeDetails::getFromProtoBuf)
.collect(Collectors.toList()));
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Pipeline that = (Pipeline) o;
return new EqualsBuilder()
.append(id, that.id)
.append(type, that.type)
.append(factor, that.factor)
.append(state, that.state)
.append(nodes, that.nodes)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder()
.append(id)
.append(type)
.append(factor)
.append(state)
.append(nodes)
.toHashCode();
}
public static Builder newBuilder() {
return new Builder();
}
public static Builder newBuilder(Pipeline pipeline) {
return new Builder(pipeline);
}
/**
* Builder class for Pipeline.
*/
public static class Builder {
private PipelineID id = null;
private ReplicationType type = null;
private ReplicationFactor factor = null;
private LifeCycleState state = null;
private List<DatanodeDetails> nodes = null;
public Builder() {}
public Builder(Pipeline pipeline) {
this.id = pipeline.getID();
this.type = pipeline.getType();
this.factor = pipeline.getFactor();
this.state = pipeline.getLifeCycleState();
this.nodes = pipeline.getNodes();
}
public Builder setId(PipelineID id1) {
this.id = id1;
return this;
}
public Builder setType(ReplicationType type1) {
this.type = type1;
return this;
}
public Builder setFactor(ReplicationFactor factor1) {
this.factor = factor1;
return this;
}
public Builder setState(LifeCycleState state1) {
this.state = state1;
return this;
}
public Builder setNodes(List<DatanodeDetails> nodes1) {
this.nodes = nodes1;
return this;
}
public Pipeline build() {
Preconditions.checkNotNull(id);
Preconditions.checkNotNull(type);
Preconditions.checkNotNull(factor);
Preconditions.checkNotNull(state);
Preconditions.checkNotNull(nodes);
return new Pipeline(id, type, factor, state, nodes);
}
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import java.util.UUID;
/**
* ID for the pipeline, the ID is based on UUID.
*/
public final class PipelineID {
private UUID id;
private PipelineID(UUID id) {
this.id = id;
}
public static PipelineID randomId() {
return new PipelineID(UUID.randomUUID());
}
public static PipelineID valueOf(UUID id) {
return new PipelineID(id);
}
public UUID getId() {
return id;
}
public HddsProtos.PipelineID getProtobuf() {
return HddsProtos.PipelineID.newBuilder().setId(id.toString()).build();
}
public static PipelineID getFromProtobuf(HddsProtos.PipelineID protos) {
return new PipelineID(UUID.fromString(protos.getId()));
}
@Override
public String toString() {
return "PipelineID=" + id;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PipelineID that = (PipelineID) o;
return id.equals(that.id);
}
@Override
public int hashCode() {
return id.hashCode();
}
}

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
/**
Ozone supports the notion of different kind of pipelines.
That means that we can have a replication pipeline build on
Ratis, Simple or some other protocol. All Pipeline managers
the entities in charge of pipelines reside in the package.
*/

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Creates pipeline based on replication type.
*/
public final class PipelineFactory {
private Map<ReplicationType, PipelineProvider> providers;
PipelineFactory(NodeManager nodeManager,
PipelineStateManager stateManager) {
providers = new HashMap<>();
providers.put(ReplicationType.STAND_ALONE,
new SimplePipelineProvider(nodeManager));
providers.put(ReplicationType.RATIS,
new RatisPipelineProvider(nodeManager, stateManager));
}
public Pipeline create(ReplicationType type, ReplicationFactor factor)
throws IOException {
return providers.get(type).create(factor);
}
public Pipeline create(ReplicationType type, List<DatanodeDetails> nodes)
throws IOException {
return providers.get(type).create(nodes);
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Set;
/**
* Interface which exposes the api for pipeline management.
*/
public interface PipelineManager extends Closeable {
Pipeline createPipeline(ReplicationType type, ReplicationFactor factor)
throws IOException;
Pipeline createPipeline(ReplicationType type, List<DatanodeDetails> nodes)
throws IOException;
Pipeline getPipeline(PipelineID pipelineID) throws IOException;
void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID)
throws IOException;
void removeContainerFromPipeline(PipelineID pipelineID, ContainerID containerID)
throws IOException;
Set<ContainerID> getContainersInPipeline(PipelineID pipelineID)
throws IOException;
void finalizePipeline(PipelineID pipelineID) throws IOException;
void closePipeline(PipelineID pipelineId) throws IOException;
void removePipeline(PipelineID pipelineID) throws IOException;
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import java.io.IOException;
import java.util.List;
/**
* Interface for creating pipelines.
*/
public interface PipelineProvider {
Pipeline create(ReplicationFactor factor) throws IOException;
Pipeline create(List<DatanodeDetails> nodes) throws IOException;
}

View File

@ -0,0 +1,179 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.common.statemachine.StateMachine;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_PIPELINE_STATE;
/**
* Manages the state of pipelines in SCM. All write operations like pipeline
* creation, removal and updates should come via SCMPipelineManager.
* PipelineStateMap class holds the data structures related to pipeline and its
* state. All the read and write operations in PipelineStateMap are protected
* by a read write lock.
*/
class PipelineStateManager {
private static final Logger LOG = LoggerFactory.getLogger(
org.apache.hadoop.hdds.scm.pipelines.PipelineStateManager.class);
private final PipelineStateMap pipelineStateMap;
private final StateMachine<LifeCycleState, LifeCycleEvent> stateMachine;
private final LeaseManager<Pipeline> pipelineLeaseManager;
PipelineStateManager(Configuration conf) {
this.pipelineStateMap = new PipelineStateMap();
Set<LifeCycleState> finalStates = new HashSet<>();
long pipelineCreationLeaseTimeout = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
// TODO: Use LeaseManager for creation of pipelines.
// Add pipeline initialization logic.
this.pipelineLeaseManager = new LeaseManager<>("PipelineCreation",
pipelineCreationLeaseTimeout);
this.pipelineLeaseManager.start();
finalStates.add(LifeCycleState.CLOSED);
this.stateMachine = new StateMachine<>(LifeCycleState.ALLOCATED,
finalStates);
initializeStateMachine();
}
/*
* Event and State Transition Mapping.
*
* State: ALLOCATED ---------------> CREATING
* Event: CREATE
*
* State: CREATING ---------------> OPEN
* Event: CREATED
*
* State: OPEN ---------------> CLOSING
* Event: FINALIZE
*
* State: CLOSING ---------------> CLOSED
* Event: CLOSE
*
* State: CREATING ---------------> CLOSED
* Event: TIMEOUT
*
*
* Container State Flow:
*
* [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
* (CREATE) | (CREATED) (FINALIZE) |
* | |
* | |
* |(TIMEOUT) |(CLOSE)
* | |
* +--------> [CLOSED] <--------+
*/
/**
* Add javadoc.
*/
private void initializeStateMachine() {
stateMachine.addTransition(LifeCycleState.ALLOCATED,
LifeCycleState.CREATING, LifeCycleEvent.CREATE);
stateMachine.addTransition(LifeCycleState.CREATING,
LifeCycleState.OPEN, LifeCycleEvent.CREATED);
stateMachine.addTransition(LifeCycleState.OPEN,
LifeCycleState.CLOSING, LifeCycleEvent.FINALIZE);
stateMachine.addTransition(LifeCycleState.CLOSING,
LifeCycleState.CLOSED, LifeCycleEvent.CLOSE);
stateMachine.addTransition(LifeCycleState.CREATING,
LifeCycleState.CLOSED, LifeCycleEvent.TIMEOUT);
}
Pipeline updatePipelineState(PipelineID pipelineID, LifeCycleEvent event)
throws IOException {
Pipeline pipeline = null;
try {
pipeline = pipelineStateMap.getPipeline(pipelineID);
LifeCycleState newState =
stateMachine.getNextState(pipeline.getLifeCycleState(), event);
return pipelineStateMap.updatePipelineState(pipeline.getID(), newState);
} catch (InvalidStateTransitionException ex) {
String error = String.format("Failed to update pipeline state %s, "
+ "reason: invalid state transition from state: %s upon "
+ "event: %s.", pipeline.getID(), pipeline.getLifeCycleState(),
event);
LOG.error(error);
throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
}
}
void addPipeline(Pipeline pipeline) throws IOException {
pipelineStateMap.addPipeline(pipeline);
}
void addContainerToPipeline(PipelineID pipelineId, ContainerID containerID)
throws IOException {
pipelineStateMap.addContainerToPipeline(pipelineId, containerID);
}
Pipeline getPipeline(PipelineID pipelineID) throws IOException {
return pipelineStateMap.getPipeline(pipelineID);
}
List<Pipeline> getPipelines(HddsProtos.ReplicationType type) {
return pipelineStateMap.getPipelines(type);
}
Set<ContainerID> getContainers(PipelineID pipelineID) throws IOException {
return pipelineStateMap.getContainers(pipelineID);
}
void removePipeline(PipelineID pipelineID) throws IOException {
pipelineStateMap.removePipeline(pipelineID);
}
void removeContainerFromPipeline(PipelineID pipelineID,
ContainerID containerID) throws IOException {
pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID);
}
void close() {
pipelineLeaseManager.shutdown();
}
}

View File

@ -0,0 +1,212 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
/**
* Holds the data structures which maintain the information about pipeline and
* its state. All the read write operations in this class are protected by a
* lock.
* Invariant: If a pipeline exists in PipelineStateMap, both pipelineMap and
* pipeline2container would have a non-null mapping for it.
*/
class PipelineStateMap {
private static final Logger LOG = LoggerFactory.getLogger(
PipelineStateMap.class);
private final Map<PipelineID, Pipeline> pipelineMap;
private final Map<PipelineID, Set<ContainerID>> pipeline2container;
PipelineStateMap() {
this.pipelineMap = new HashMap<>();
this.pipeline2container = new HashMap<>();
}
/**
* Adds provided pipeline in the data structures.
*
* @param pipeline - Pipeline to add
* @throws IOException if pipeline with provided pipelineID already exists
*/
void addPipeline(Pipeline pipeline) throws IOException {
Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
Preconditions.checkArgument(
pipeline.getNodes().size() == pipeline.getFactor().getNumber(),
String.format("Nodes size=%d, replication factor=%d do not match ",
pipeline.getNodes().size(), pipeline.getFactor().getNumber()));
if (pipelineMap.putIfAbsent(pipeline.getID(), pipeline) != null) {
LOG.warn("Duplicate pipeline ID detected. {}", pipeline.getID());
throw new IOException(String
.format("Duplicate pipeline ID %s detected.", pipeline.getID()));
}
pipeline2container.put(pipeline.getID(), new TreeSet<>());
}
/**
* Add container to an existing pipeline.
*
* @param pipelineID - PipelineID of the pipeline to which container is added
* @param containerID - ContainerID of the container to add
* @throws IOException if pipeline is not in open state or does not exist
*/
void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID)
throws IOException {
Preconditions.checkNotNull(pipelineID,
"Pipeline Id cannot be null");
Preconditions.checkNotNull(containerID,
"container Id cannot be null");
Pipeline pipeline = getPipeline(pipelineID);
// TODO: verify the state we need the pipeline to be in
if (!isOpen(pipeline)) {
throw new IOException(
String.format("%s is not in open state", pipelineID));
}
pipeline2container.get(pipelineID).add(containerID);
}
/**
* Get pipeline corresponding to specified pipelineID.
*
* @param pipelineID - PipelineID of the pipeline to be retrieved
* @return Pipeline
* @throws IOException if pipeline is not found
*/
Pipeline getPipeline(PipelineID pipelineID) throws IOException {
Pipeline pipeline = pipelineMap.get(pipelineID);
if (pipeline == null) {
throw new IOException(String.format("%s not found", pipelineID));
}
return pipeline;
}
/**
* Get pipeline corresponding to specified replication type.
*
* @param type - ReplicationType
* @return List of pipelines which have the specified replication type
*/
List<Pipeline> getPipelines(ReplicationType type) {
Preconditions.checkNotNull(type, "Replication type cannot be null");
return pipelineMap.values().stream().filter(p -> p.getType().equals(type))
.collect(Collectors.toList());
}
/**
* Get set of containers corresponding to a pipeline.
*
* @param pipelineID - PipelineID
* @return Set of Containers belonging to the pipeline
* @throws IOException if pipeline is not found
*/
Set<ContainerID> getContainers(PipelineID pipelineID)
throws IOException {
Set<ContainerID> containerIDs = pipeline2container.get(pipelineID);
if (containerIDs == null) {
throw new IOException(String.format("%s not found", pipelineID));
}
return new HashSet<>(containerIDs);
}
/**
* Remove pipeline from the data structures.
*
* @param pipelineID - PipelineID of the pipeline to be removed
* @throws IOException if the pipeline is not empty or does not exist
*/
void removePipeline(PipelineID pipelineID) throws IOException {
Preconditions.checkNotNull(pipelineID, "Pipeline Id cannot be null");
//TODO: Add a flag which suppresses exception if pipeline does not exist?
Set<ContainerID> containerIDs = getContainers(pipelineID);
if (containerIDs.size() != 0) {
throw new IOException(
String.format("Pipeline with %s is not empty", pipelineID));
}
pipelineMap.remove(pipelineID);
pipeline2container.remove(pipelineID);
}
/**
* Remove container from a pipeline.
*
* @param pipelineID - PipelineID of the pipeline from which container needs
* to be removed
* @param containerID - ContainerID of the container to remove
* @throws IOException if pipeline does not exist
*/
void removeContainerFromPipeline(PipelineID pipelineID,
ContainerID containerID) throws IOException {
Preconditions.checkNotNull(pipelineID,
"Pipeline Id cannot be null");
Preconditions.checkNotNull(containerID,
"container Id cannot be null");
Pipeline pipeline = getPipeline(pipelineID);
Set<ContainerID> containerIDs = pipeline2container.get(pipelineID);
containerIDs.remove(containerID);
if (containerIDs.size() == 0 && isClosingOrClosed(pipeline)) {
removePipeline(pipelineID);
}
}
/**
* Updates the state of pipeline.
*
* @param pipelineID - PipelineID of the pipeline whose state needs
* to be updated
* @param state - new state of the pipeline
* @return Pipeline with the updated state
* @throws IOException if pipeline does not exist
*/
Pipeline updatePipelineState(PipelineID pipelineID, LifeCycleState state)
throws IOException {
Preconditions.checkNotNull(pipelineID, "Pipeline Id cannot be null");
Preconditions.checkNotNull(state, "Pipeline LifeCycleState cannot be null");
Pipeline pipeline = getPipeline(pipelineID);
pipeline = pipelineMap
.put(pipelineID, Pipeline.newBuilder(pipeline).setState(state).build());
// TODO: Verify if need to throw exception for non-existent pipeline
return pipeline;
}
private boolean isClosingOrClosed(Pipeline pipeline) {
LifeCycleState state = pipeline.getLifeCycleState();
return state == LifeCycleState.CLOSING || state == LifeCycleState.CLOSED;
}
private boolean isOpen(Pipeline pipeline) {
return pipeline.getLifeCycleState() == LifeCycleState.OPEN;
}
}

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Implements Api for creating ratis pipelines.
*/
public class RatisPipelineProvider implements PipelineProvider {
private final NodeManager nodeManager;
private final PipelineStateManager stateManager;
RatisPipelineProvider(NodeManager nodeManager,
PipelineStateManager stateManager) {
this.nodeManager = nodeManager;
this.stateManager = stateManager;
}
/**
* Create pluggable container placement policy implementation instance.
*
* @param nodeManager - SCM node manager.
* @param conf - configuration.
* @return SCM container placement policy implementation instance.
*/
@SuppressWarnings("unchecked")
// TODO: should we rename ContainerPlacementPolicy to PipelinePlacementPolicy?
private static ContainerPlacementPolicy createContainerPlacementPolicy(
final NodeManager nodeManager, final Configuration conf) {
Class<? extends ContainerPlacementPolicy> implClass =
(Class<? extends ContainerPlacementPolicy>) conf.getClass(
ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
SCMContainerPlacementRandom.class);
try {
Constructor<? extends ContainerPlacementPolicy> ctor =
implClass.getDeclaredConstructor(NodeManager.class,
Configuration.class);
return ctor.newInstance(nodeManager, conf);
} catch (RuntimeException e) {
throw e;
} catch (InvocationTargetException e) {
throw new RuntimeException(implClass.getName()
+ " could not be constructed.", e.getCause());
} catch (Exception e) {
// LOG.error("Unhandled exception occurred, Placement policy will not " +
// "be functional.");
throw new IllegalArgumentException("Unable to load " +
"ContainerPlacementPolicy", e);
}
}
@Override
public Pipeline create(ReplicationFactor factor) throws IOException {
// Get set of datanodes already used for ratis pipeline
Set<DatanodeDetails> dnsUsed = new HashSet<>();
stateManager.getPipelines(ReplicationType.RATIS)
.forEach(p -> dnsUsed.addAll(p.getNodes()));
// Get list of healthy nodes
List<DatanodeDetails> dns =
nodeManager.getNodes(NodeState.HEALTHY)
.parallelStream()
.filter(dn -> !dnsUsed.contains(dn))
.limit(factor.getNumber())
.collect(Collectors.toList());
if (dns.size() < factor.getNumber()) {
String e = String
.format("Cannot create pipeline of factor %d using %d nodes.",
factor.getNumber(), dns.size());
throw new IOException(e);
}
return Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setState(LifeCycleState.ALLOCATED)
.setType(ReplicationType.RATIS)
.setFactor(factor)
.setNodes(dns)
.build();
}
@Override
public Pipeline create(List<DatanodeDetails> nodes) throws IOException {
ReplicationFactor factor = ReplicationFactor.valueOf(nodes.size());
if (factor == null) {
throw new IOException(String
.format("Nodes size=%d does not match any replication factor",
nodes.size()));
}
return Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setState(LifeCycleState.ALLOCATED)
.setType(ReplicationType.RATIS)
.setFactor(factor)
.setNodes(nodes)
.build();
}
}

View File

@ -0,0 +1,226 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.hdds.scm
.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
import static org.apache.hadoop.hdds.scm
.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB;
/**
* Implements api needed for management of pipelines. All the write operations
* for pipelines must come via PipelineManager. It synchronises all write
* and read operations via a ReadWriteLock.
*/
public class SCMPipelineManager implements PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(SCMPipelineManager.class);
private final ReadWriteLock lock;
private final PipelineFactory pipelineFactory;
private final PipelineStateManager stateManager;
private final MetadataStore pipelineStore;
public SCMPipelineManager(Configuration conf, NodeManager nodeManager)
throws IOException {
this.lock = new ReentrantReadWriteLock();
this.stateManager = new PipelineStateManager(conf);
this.pipelineFactory = new PipelineFactory(nodeManager, stateManager);
int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
File metaDir = getOzoneMetaDirPath(conf);
File pipelineDBPath = new File(metaDir, SCM_PIPELINE_DB);
this.pipelineStore =
MetadataStoreBuilder.newBuilder()
.setConf(conf)
.setDbFile(pipelineDBPath)
.setCacheSize(cacheSize * OzoneConsts.MB)
.build();
initializePipelineState();
}
private void initializePipelineState() throws IOException {
if (pipelineStore.isEmpty()) {
LOG.info("No pipeline exists in current db");
return;
}
List<Map.Entry<byte[], byte[]>> pipelines =
pipelineStore.getSequentialRangeKVs(null, Integer.MAX_VALUE, null);
for (Map.Entry<byte[], byte[]> entry : pipelines) {
Pipeline pipeline = Pipeline
.fromProtobuf(HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue()));
Preconditions.checkNotNull(pipeline);
stateManager.addPipeline(pipeline);
}
}
@Override
public synchronized Pipeline createPipeline(
ReplicationType type, ReplicationFactor factor) throws IOException {
lock.writeLock().lock();
try {
Pipeline pipeline = pipelineFactory.create(type, factor);
stateManager.addPipeline(pipeline);
try {
pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(),
pipeline.getProtobufMessage().toByteArray());
} catch (IOException ioe) {
// if db operation fails we need to revert the pipeline creation in
// state manager.
stateManager.removePipeline(pipeline.getID());
throw ioe;
}
return pipeline;
} finally {
lock.writeLock().unlock();
}
}
@Override
public Pipeline createPipeline(ReplicationType type,
List<DatanodeDetails> nodes)
throws IOException {
// This will mostly be used to create dummy pipeline for SimplePipelines.
lock.writeLock().lock();
try {
return pipelineFactory.create(type, nodes);
} finally {
lock.writeLock().unlock();
}
}
@Override
public Pipeline getPipeline(PipelineID pipelineID) throws IOException {
lock.readLock().lock();
try {
return stateManager.getPipeline(pipelineID);
} finally {
lock.readLock().unlock();
}
}
@Override
public void addContainerToPipeline(PipelineID pipelineID,
ContainerID containerID) throws IOException {
lock.writeLock().lock();
try {
stateManager.addContainerToPipeline(pipelineID, containerID);
} finally {
lock.writeLock().unlock();
}
}
@Override
public void removeContainerFromPipeline(PipelineID pipelineID,
ContainerID containerID) throws IOException {
lock.writeLock().lock();
try {
stateManager.removeContainerFromPipeline(pipelineID, containerID);
} finally {
lock.writeLock().unlock();
}
}
@Override
public Set<ContainerID> getContainersInPipeline(PipelineID pipelineID)
throws IOException {
lock.readLock().lock();
try {
return stateManager.getContainers(pipelineID);
} finally {
lock.readLock().unlock();
}
}
@Override
public void finalizePipeline(PipelineID pipelineId) throws IOException {
lock.writeLock().lock();
try {
//TODO: close all containers in this pipeline
Pipeline pipeline =
stateManager.updatePipelineState(pipelineId, LifeCycleEvent.FINALIZE);
pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(),
pipeline.getProtobufMessage().toByteArray());
} finally {
lock.writeLock().unlock();
}
}
@Override
public void closePipeline(PipelineID pipelineId) throws IOException {
lock.writeLock().lock();
try {
Pipeline pipeline =
stateManager.updatePipelineState(pipelineId, LifeCycleEvent.CLOSE);
pipelineStore.put(pipeline.getID().getProtobuf().toByteArray(),
pipeline.getProtobufMessage().toByteArray());
} finally {
lock.writeLock().unlock();
}
}
@Override
public void removePipeline(PipelineID pipelineID) throws IOException {
lock.writeLock().lock();
try {
stateManager.removePipeline(pipelineID);
pipelineStore.delete(pipelineID.getProtobuf().toByteArray());
} finally {
lock.writeLock().unlock();
}
}
@Override
public void close() throws IOException {
lock.writeLock().lock();
try {
stateManager.close();
} finally {
lock.writeLock().unlock();
}
}
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/**
* Implements Api for creating stand alone pipelines.
*/
public class SimplePipelineProvider implements PipelineProvider {
private final NodeManager nodeManager;
public SimplePipelineProvider(NodeManager nodeManager) {
this.nodeManager = nodeManager;
}
@Override
public Pipeline create(ReplicationFactor factor) throws IOException {
List<DatanodeDetails> dns =
nodeManager.getNodes(NodeState.HEALTHY);
if (dns.size() < factor.getNumber()) {
String e = String
.format("Cannot create pipeline of factor %d using %d nodes.",
factor.getNumber(), dns.size());
throw new IOException(e);
}
Collections.shuffle(dns);
return Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setState(LifeCycleState.ALLOCATED)
.setType(ReplicationType.STAND_ALONE)
.setFactor(factor)
.setNodes(dns.subList(0, factor.getNumber()))
.build();
}
@Override
public Pipeline create(List<DatanodeDetails> nodes) throws IOException {
ReplicationFactor factor = ReplicationFactor.valueOf(nodes.size());
if (factor == null) {
throw new IOException(String
.format("Nodes size=%d does not match any replication factor",
nodes.size()));
}
return Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setState(LifeCycleState.ALLOCATED)
.setType(ReplicationType.STAND_ALONE)
.setFactor(factor)
.setNodes(nodes)
.build();
}
}

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
/**
Ozone supports the notion of different kind of pipelines.
That means that we can have a replication pipeline build on
Ratis, Simple or some other protocol. All Pipeline managers
the entities in charge of pipelines reside in the package.
*/

View File

@ -0,0 +1,246 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Test for PipelineStateManager.
*/
public class TestPipelineStateManager {
private PipelineStateManager stateManager;
@Before
public void init() throws Exception {
Configuration conf = new OzoneConfiguration();
stateManager = new PipelineStateManager(conf);
}
private Pipeline createDummyPipeline(int numNodes) {
List<DatanodeDetails> nodes = new ArrayList<>();
for (int i = 0; i < numNodes; i++) {
nodes.add(TestUtils.randomDatanodeDetails());
}
return Pipeline.newBuilder()
.setType(HddsProtos.ReplicationType.RATIS)
.setFactor(HddsProtos.ReplicationFactor.ONE)
.setNodes(nodes)
.setState(HddsProtos.LifeCycleState.ALLOCATED)
.setId(PipelineID.randomId())
.build();
}
@Test
public void testAddAndGetPipeline() throws IOException {
Pipeline pipeline = createDummyPipeline(0);
try {
stateManager.addPipeline(pipeline);
Assert.fail("Pipeline should not have been added");
} catch (IllegalArgumentException e) {
// replication factor and number of nodes in the pipeline do not match
Assert.assertTrue(e.getMessage().contains("do not match"));
}
// add a pipeline
pipeline = createDummyPipeline(1);
stateManager.addPipeline(pipeline);
try {
stateManager.addPipeline(pipeline);
Assert.fail("Pipeline should not have been added");
} catch (IOException e) {
// Can not add a pipeline twice
Assert.assertTrue(e.getMessage().contains("Duplicate pipeline ID"));
}
// verify pipeline returned is same
Pipeline pipeline1 = stateManager.getPipeline(pipeline.getID());
Assert.assertTrue(pipeline == pipeline1);
// clean up
stateManager.removePipeline(pipeline1.getID());
}
@Test
public void testGetPipelines() throws IOException {
Set<Pipeline> pipelines = new HashSet<>();
Pipeline pipeline = createDummyPipeline(1);
pipelines.add(pipeline);
stateManager.addPipeline(pipeline);
pipeline = createDummyPipeline(1);
pipelines.add(pipeline);
stateManager.addPipeline(pipeline);
Set<Pipeline> pipelines1 = new HashSet<>(stateManager.getPipelines(
HddsProtos.ReplicationType.RATIS));
Assert.assertEquals(pipelines, pipelines1);
}
@Test
public void testAddAndGetContainer() throws IOException {
long containerID = 0;
Pipeline pipeline = createDummyPipeline(1);
stateManager.addPipeline(pipeline);
pipeline = stateManager.getPipeline(pipeline.getID());
try {
stateManager
.addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID));
Assert.fail("Container should not have been added");
} catch (IOException e) {
// add container possible only in container with open state
Assert.assertTrue(e.getMessage().contains("is not in open state"));
}
// move pipeline to open state
updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
HddsProtos.LifeCycleEvent.CREATED);
// add three containers
stateManager
.addContainerToPipeline(pipeline.getID(), ContainerID.valueof(containerID));
stateManager
.addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID));
stateManager
.addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID));
//verify the number of containers returned
Set<ContainerID> containerIDs =
stateManager.getContainers(pipeline.getID());
Assert.assertEquals(containerIDs.size(), containerID);
removePipeline(pipeline);
try {
stateManager
.addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID));
Assert.fail("Container should not have been added");
} catch (IOException e) {
// Can not add a container to removed pipeline
Assert.assertTrue(e.getMessage().contains("not found"));
}
}
@Test
public void testRemovePipeline() throws IOException {
Pipeline pipeline = createDummyPipeline(1);
stateManager.addPipeline(pipeline);
updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
HddsProtos.LifeCycleEvent.CREATED);
stateManager
.addContainerToPipeline(pipeline.getID(), ContainerID.valueof(1));
try {
stateManager.removePipeline(pipeline.getID());
Assert.fail("Pipeline should not have been removed");
} catch (IOException e) {
// can not remove a pipeline which already has containers
Assert.assertTrue(e.getMessage().contains("not empty"));
}
// remove containers and then remove the pipeline
removePipeline(pipeline);
}
@Test
public void testRemoveContainer() throws IOException {
long containerID = 1;
Pipeline pipeline = createDummyPipeline(1);
// create an open pipeline in stateMap
stateManager.addPipeline(pipeline);
updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
HddsProtos.LifeCycleEvent.CREATED);
stateManager
.addContainerToPipeline(pipeline.getID(), ContainerID.valueof(containerID));
stateManager
.removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(containerID));
// removeContainerFromPipeline in open pipeline does not lead to removal of pipeline
Assert.assertNotNull(stateManager.getPipeline(pipeline.getID()));
// add two containers in the pipeline
stateManager
.addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID));
stateManager
.addContainerToPipeline(pipeline.getID(), ContainerID.valueof(++containerID));
// move pipeline to closing state
updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.FINALIZE);
stateManager
.removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(containerID));
// removal of second last container in closing or closed pipeline should
// not lead to removal of pipeline
Assert.assertNotNull(stateManager.getPipeline(pipeline.getID()));
stateManager
.removeContainerFromPipeline(pipeline.getID(), ContainerID.valueof(--containerID));
// removal of last container in closing or closed pipeline should lead to
// removal of pipeline
try {
stateManager.getPipeline(pipeline.getID());
Assert.fail("getPipeline should have failed.");
} catch (IOException e) {
Assert.assertTrue(e.getMessage().contains(" not found"));
}
}
@Test
public void testUpdatePipelineState() throws IOException {
Pipeline pipeline = createDummyPipeline(1);
stateManager.addPipeline(pipeline);
updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
HddsProtos.LifeCycleEvent.CREATED, HddsProtos.LifeCycleEvent.FINALIZE,
HddsProtos.LifeCycleEvent.CLOSE);
pipeline = createDummyPipeline(1);
stateManager.addPipeline(pipeline);
updateEvents(pipeline.getID(), HddsProtos.LifeCycleEvent.CREATE,
HddsProtos.LifeCycleEvent.TIMEOUT);
}
private void updateEvents(PipelineID pipelineID,
HddsProtos.LifeCycleEvent... events) throws IOException {
for (HddsProtos.LifeCycleEvent event : events) {
stateManager.updatePipelineState(pipelineID, event);
}
}
private void removePipeline(Pipeline pipeline) throws IOException {
Set<ContainerID> containerIDs =
stateManager.getContainers(pipeline.getID());
for (ContainerID containerID : containerIDs) {
stateManager.removeContainerFromPipeline(pipeline.getID(), containerID);
}
stateManager.removePipeline(pipeline.getID());
}
}

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Test for RatisPipelineProvider.
*/
public class TestRatisPipelineProvider {
private NodeManager nodeManager;
private PipelineProvider provider;
private PipelineStateManager stateManager;
@Before
public void init() throws Exception {
nodeManager = new MockNodeManager(true, 10);
stateManager = new PipelineStateManager(new OzoneConfiguration());
provider = new RatisPipelineProvider(nodeManager,
stateManager);
}
@Test
public void testCreatePipelineWithFactor() throws IOException {
HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
Pipeline pipeline = provider.create(factor);
stateManager.addPipeline(pipeline);
Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getLifeCycleState(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
factor = HddsProtos.ReplicationFactor.ONE;
Pipeline pipeline1 = provider.create(factor);
stateManager.addPipeline(pipeline1);
// New pipeline should not overlap with the previous created pipeline
Assert.assertTrue(
CollectionUtils.intersection(pipeline.getNodes(), pipeline1.getNodes())
.isEmpty());
Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline1.getFactor(), factor);
Assert.assertEquals(pipeline1.getLifeCycleState(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
}
private List<DatanodeDetails> createListOfNodes(int nodeCount) {
List<DatanodeDetails> nodes = new ArrayList<>();
for (int i = 0; i < nodeCount; i++) {
nodes.add(TestUtils.randomDatanodeDetails());
}
return nodes;
}
@Test
public void testCreatePipelineWithNodes() throws IOException {
HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
Pipeline pipeline = provider.create(createListOfNodes(factor.getNumber()));
Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getLifeCycleState(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
factor = HddsProtos.ReplicationFactor.ONE;
pipeline = provider.create(createListOfNodes(factor.getNumber()));
Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getLifeCycleState(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
}
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Test for SimplePipelineProvider.
*/
public class TestSimplePipelineProvider {
private NodeManager nodeManager;
private PipelineProvider provider;
private PipelineStateManager stateManager;
@Before
public void init() throws Exception {
nodeManager = new MockNodeManager(true, 10);
stateManager = new PipelineStateManager(new OzoneConfiguration());
provider = new SimplePipelineProvider(nodeManager);
}
@Test
public void testCreatePipelineWithFactor() throws IOException {
HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
Pipeline pipeline = provider.create(factor);
stateManager.addPipeline(pipeline);
Assert.assertEquals(pipeline.getType(),
HddsProtos.ReplicationType.STAND_ALONE);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getLifeCycleState(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
factor = HddsProtos.ReplicationFactor.ONE;
Pipeline pipeline1 = provider.create(factor);
stateManager.addPipeline(pipeline1);
Assert.assertEquals(pipeline1.getType(),
HddsProtos.ReplicationType.STAND_ALONE);
Assert.assertEquals(pipeline1.getFactor(), factor);
Assert.assertEquals(pipeline1.getLifeCycleState(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
}
private List<DatanodeDetails> createListOfNodes(int nodeCount) {
List<DatanodeDetails> nodes = new ArrayList<>();
for (int i = 0; i < nodeCount; i++) {
nodes.add(TestUtils.randomDatanodeDetails());
}
return nodes;
}
@Test
public void testCreatePipelineWithNodes() throws IOException {
HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
Pipeline pipeline = provider.create(createListOfNodes(factor.getNumber()));
Assert.assertEquals(pipeline.getType(),
HddsProtos.ReplicationType.STAND_ALONE);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getLifeCycleState(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
factor = HddsProtos.ReplicationFactor.ONE;
pipeline = provider.create(createListOfNodes(factor.getNumber()));
Assert.assertEquals(pipeline.getType(),
HddsProtos.ReplicationType.STAND_ALONE);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getLifeCycleState(),
HddsProtos.LifeCycleState.ALLOCATED);
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
}
}