HDDS-893. pipeline status is ALLOCATED in scmcli listPipelines command. Contributed by Lokesh Jain.

This commit is contained in:
Arpit Agarwal 2018-12-19 13:42:06 +05:30
parent b1ce9aa3b3
commit cf571133b8
7 changed files with 107 additions and 15 deletions

View File

@ -24,6 +24,7 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.UnknownPipelineStateException;
/**
* Class wraps ozone container info.
@ -48,13 +49,15 @@ public class ContainerWithPipeline implements Comparator<ContainerWithPipeline>,
}
public static ContainerWithPipeline fromProtobuf(
HddsProtos.ContainerWithPipeline allocatedContainer) {
HddsProtos.ContainerWithPipeline allocatedContainer)
throws UnknownPipelineStateException {
return new ContainerWithPipeline(
ContainerInfo.fromProtobuf(allocatedContainer.getContainerInfo()),
Pipeline.getFromProtobuf(allocatedContainer.getPipeline()));
}
public HddsProtos.ContainerWithPipeline getProtobuf() {
public HddsProtos.ContainerWithPipeline getProtobuf()
throws UnknownPipelineStateException {
HddsProtos.ContainerWithPipeline.Builder builder =
HddsProtos.ContainerWithPipeline.newBuilder();
builder.setContainerInfo(getContainerInfo().getProtobuf())

View File

@ -136,11 +136,13 @@ public final class Pipeline {
return nodeStatus.isEmpty();
}
public HddsProtos.Pipeline getProtobufMessage() {
public HddsProtos.Pipeline getProtobufMessage()
throws UnknownPipelineStateException {
HddsProtos.Pipeline.Builder builder = HddsProtos.Pipeline.newBuilder()
.setId(id.getProtobuf())
.setType(type)
.setFactor(factor)
.setState(PipelineState.getProtobuf(state))
.setLeaderID("")
.addAllMembers(nodeStatus.keySet().stream()
.map(DatanodeDetails::getProtoBufMessage)
@ -148,11 +150,13 @@ public final class Pipeline {
return builder.build();
}
public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline) {
public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline)
throws UnknownPipelineStateException {
Preconditions.checkNotNull(pipeline, "Pipeline is null");
return new Builder().setId(PipelineID.getFromProtobuf(pipeline.getId()))
.setFactor(pipeline.getFactor())
.setType(pipeline.getType())
.setState(PipelineState.ALLOCATED)
.setState(PipelineState.fromProtobuf(pipeline.getState()))
.setNodes(pipeline.getMembersList().stream()
.map(DatanodeDetails::getFromProtoBuf).collect(Collectors.toList()))
.build();
@ -270,6 +274,32 @@ public final class Pipeline {
* Possible Pipeline states in SCM.
*/
public enum PipelineState {
ALLOCATED, OPEN, CLOSED
ALLOCATED, OPEN, CLOSED;
public static PipelineState fromProtobuf(HddsProtos.PipelineState state)
throws UnknownPipelineStateException {
Preconditions.checkNotNull(state, "Pipeline state is null");
switch (state) {
case PIPELINE_ALLOCATED: return ALLOCATED;
case PIPELINE_OPEN: return OPEN;
case PIPELINE_CLOSED: return CLOSED;
default:
throw new UnknownPipelineStateException(
"Pipeline state: " + state + " is not recognized.");
}
}
public static HddsProtos.PipelineState getProtobuf(PipelineState state)
throws UnknownPipelineStateException {
Preconditions.checkNotNull(state, "Pipeline state is null");
switch (state) {
case ALLOCATED: return HddsProtos.PipelineState.PIPELINE_ALLOCATED;
case OPEN: return HddsProtos.PipelineState.PIPELINE_OPEN;
case CLOSED: return HddsProtos.PipelineState.PIPELINE_CLOSED;
default:
throw new UnknownPipelineStateException(
"Pipeline state: " + state + " is not recognized.");
}
}
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.pipeline;
import java.io.IOException;
/**
* Signals that a pipeline state is not recognized.
*/
public class UnknownPipelineStateException extends IOException {
/**
* Constructs an {@code UnknownPipelineStateException} with {@code null}
* as its error detail message.
*/
public UnknownPipelineStateException() {
super();
}
/**
* Constructs an {@code UnknownPipelineStateException} with the specified
* detail message.
*
* @param message
* The detail message (which is saved for later retrieval
* by the {@link #getMessage()} method)
*/
public UnknownPipelineStateException(String message) {
super(message);
}
}

View File

@ -315,9 +315,12 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
.newBuilder().build();
ListPipelineResponseProto response = rpcProxy.listPipelines(
NULL_RPC_CONTROLLER, request);
return response.getPipelinesList().stream()
.map(Pipeline::getFromProtobuf)
.collect(Collectors.toList());
List<Pipeline> list = new ArrayList<>();
for (HddsProtos.Pipeline pipeline : response.getPipelinesList()) {
Pipeline fromProtobuf = Pipeline.getFromProtobuf(pipeline);
list.add(fromProtobuf);
}
return list;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}

View File

@ -227,9 +227,11 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
try {
ListPipelineResponseProto.Builder builder = ListPipelineResponseProto
.newBuilder();
List<Pipeline> pipelineIDs = impl.listPipelines();
pipelineIDs.stream().map(Pipeline::getProtobufMessage)
.forEach(builder::addPipelines);
List<Pipeline> pipelines = impl.listPipelines();
for (Pipeline pipeline : pipelines) {
HddsProtos.Pipeline protobufMessage = pipeline.getProtobufMessage();
builder.addPipelines(protobufMessage);
}
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);

View File

@ -44,11 +44,17 @@ message PipelineID {
required string id = 1;
}
enum PipelineState {
PIPELINE_ALLOCATED = 1;
PIPELINE_OPEN = 2;
PIPELINE_CLOSED = 3;
}
message Pipeline {
required string leaderID = 1;
repeated DatanodeDetailsProto members = 2;
// TODO: remove the state and leaderID from this class
optional LifeCycleState state = 3 [default = OPEN];
optional PipelineState state = 3 [default = PIPELINE_ALLOCATED];
optional ReplicationType type = 4 [default = STAND_ALONE];
optional ReplicationFactor factor = 5 [default = ONE];
required PipelineID id = 6;

View File

@ -99,8 +99,10 @@ public class SCMPipelineManager implements PipelineManager {
(MetadataKeyFilters.MetadataKeyFilter[])null);
for (Map.Entry<byte[], byte[]> entry : pipelines) {
Pipeline pipeline = Pipeline.getFromProtobuf(
HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue()));
HddsProtos.Pipeline.Builder pipelineBuilder = HddsProtos.Pipeline
.newBuilder(HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue()));
Pipeline pipeline = Pipeline.getFromProtobuf(pipelineBuilder.setState(
HddsProtos.PipelineState.PIPELINE_ALLOCATED).build());
Preconditions.checkNotNull(pipeline);
stateManager.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);