From b5d7b292c988de6a8555d472a4448275522b7622 Mon Sep 17 00:00:00 2001 From: Mukul Kumar Singh Date: Mon, 19 Nov 2018 22:58:25 +0530 Subject: [PATCH] HDDS-718. Introduce new SCM Commands to list and close Pipelines. Contributed by Lokesh Jain. --- .../scm/client/ContainerOperationClient.java | 11 ++++ .../hadoop/hdds/scm/client/ScmClient.java | 16 ++++++ .../hadoop/hdds/scm/pipeline/Pipeline.java | 25 +++++---- .../StorageContainerLocationProtocol.java | 17 ++++++ ...ocationProtocolClientSideTranslatorPB.java | 33 ++++++++++++ ...ocationProtocolServerSideTranslatorPB.java | 37 +++++++++++++ .../StorageContainerLocationProtocol.proto | 25 +++++++++ .../hdds/scm/pipeline/PipelineManager.java | 2 + .../scm/pipeline/PipelineStateManager.java | 4 ++ .../hdds/scm/pipeline/PipelineStateMap.java | 8 +++ .../hdds/scm/pipeline/RatisPipelineUtils.java | 2 +- .../hdds/scm/pipeline/SCMPipelineManager.java | 10 ++++ .../scm/server/SCMClientProtocolServer.java | 18 +++++++ .../apache/hadoop/hdds/scm/cli/SCMCLI.java | 6 ++- .../cli/pipeline/ClosePipelineSubcommand.java | 53 +++++++++++++++++++ .../cli/pipeline/ListPipelinesSubcommand.java | 48 +++++++++++++++++ .../hdds/scm/cli/pipeline/package-info.java | 22 ++++++++ .../pipeline/TestPipelineStateManager.java | 7 +++ 18 files changed, 331 insertions(+), 13 deletions(-) create mode 100644 hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ClosePipelineSubcommand.java create mode 100644 hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ListPipelinesSubcommand.java create mode 100644 hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/package-info.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java index ef72e38368c..85b5d29f32f 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java @@ -224,6 +224,17 @@ public class ContainerOperationClient implements ScmClient { factor, nodePool); } + @Override + public List listPipelines() throws IOException { + return storageContainerLocationClient.listPipelines(); + } + + @Override + public void closePipeline(HddsProtos.PipelineID pipelineID) + throws IOException { + storageContainerLocationClient.closePipeline(pipelineID); + } + @Override public void close() { try { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java index 6250adf421e..4f4239f703d 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -172,6 +172,22 @@ public interface ScmClient extends Closeable { HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool) throws IOException; + /** + * Returns the list of active Pipelines. + * + * @return list of Pipeline + * @throws IOException in case of any exception + */ + List listPipelines() throws IOException; + + /** + * Closes the pipeline given a pipeline ID. + * + * @param pipelineID PipelineID to close. + * @throws IOException In case of exception while closing the pipeline + */ + void closePipeline(HddsProtos.PipelineID pipelineID) throws IOException; + /** * Check if SCM is in chill mode. * diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index 62081f48351..a103bd74396 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -188,6 +188,20 @@ public final class Pipeline { .toHashCode(); } + @Override + public String toString() { + final StringBuilder b = + new StringBuilder(getClass().getSimpleName()).append("["); + b.append(" Id: ").append(id.getId()); + b.append(", Nodes: "); + nodeStatus.keySet().forEach(b::append); + b.append(", Type:").append(getType()); + b.append(", Factor:").append(getFactor()); + b.append(", State:").append(getPipelineState()); + b.append("]"); + return b.toString(); + } + public static Builder newBuilder() { return new Builder(); } @@ -196,17 +210,6 @@ public final class Pipeline { return new Builder(pipeline); } - @Override - public String toString() { - return "Pipeline{" + - "id=" + id + - ", type=" + type + - ", factor=" + factor + - ", state=" + state + - ", nodeStatus=" + nodeStatus + - '}'; - } - /** * Builder class for Pipeline. */ diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index 14c55c31b8c..c8a9dcfb6be 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -126,6 +126,23 @@ public interface StorageContainerLocationProtocol { HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool) throws IOException; + /** + * Returns the list of active Pipelines. + * + * @return list of Pipeline + * + * @throws IOException in case of any exception + */ + List listPipelines() throws IOException; + + /** + * Closes a pipeline given the pipelineID. + * + * @param pipelineID ID of the pipeline to demolish + * @throws IOException + */ + void closePipeline(HddsProtos.PipelineID pipelineID) throws IOException; + /** * Returns information about SCM. * diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 3a4fa4645ea..117e58dd4aa 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -20,6 +20,9 @@ import com.google.common.base.Preconditions; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitChillModeRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitChillModeResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto; @@ -64,6 +67,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; /** * This class is the client-side translator to translate the requests made on @@ -304,6 +308,35 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB } } + @Override + public List listPipelines() throws IOException { + try { + ListPipelineRequestProto request = ListPipelineRequestProto + .newBuilder().build(); + ListPipelineResponseProto response = rpcProxy.listPipelines( + NULL_RPC_CONTROLLER, request); + return response.getPipelinesList().stream() + .map(Pipeline::getFromProtobuf) + .collect(Collectors.toList()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void closePipeline(HddsProtos.PipelineID pipelineID) + throws IOException { + try { + ClosePipelineRequestProto request = + ClosePipelineRequestProto.newBuilder() + .setPipelineID(pipelineID) + .build(); + rpcProxy.closePipeline(NULL_RPC_CONTROLLER, request); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + @Override public ScmInfo getScmInfo() throws IOException { HddsProtos.GetScmInfoRequestProto request = diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java index e2a4ee00ce5..2ae559a4aa1 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -43,6 +44,14 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerLocationProtocolProtos.ContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerLocationProtocolProtos.ContainerResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ClosePipelineRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ClosePipelineResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ListPipelineRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ListPipelineResponseProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerLocationProtocolProtos.GetContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto @@ -211,6 +220,34 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB return null; } + @Override + public ListPipelineResponseProto listPipelines( + RpcController controller, ListPipelineRequestProto request) + throws ServiceException { + try { + ListPipelineResponseProto.Builder builder = ListPipelineResponseProto + .newBuilder(); + List pipelineIDs = impl.listPipelines(); + pipelineIDs.stream().map(Pipeline::getProtobufMessage) + .forEach(builder::addPipelines); + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public ClosePipelineResponseProto closePipeline( + RpcController controller, ClosePipelineRequestProto request) + throws ServiceException { + try { + impl.closePipeline(request.getPipelineID()); + return ClosePipelineResponseProto.newBuilder().build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + @Override public HddsProtos.GetScmInfoRespsonseProto getScmInfo( RpcController controller, HddsProtos.GetScmInfoRequestProto req) diff --git a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto index 71190ac1a58..fe34fc0e38d 100644 --- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto @@ -149,6 +149,19 @@ message PipelineResponseProto { optional string errorMessage = 3; } +message ListPipelineRequestProto { +} + +message ListPipelineResponseProto { + repeated Pipeline pipelines = 1; +} + +message ClosePipelineRequestProto { + required PipelineID pipelineID = 1; +} + +message ClosePipelineResponseProto { +} message InChillModeRequestProto { } @@ -218,6 +231,18 @@ service StorageContainerLocationProtocolService { rpc allocatePipeline(PipelineRequestProto) returns (PipelineResponseProto); + /** + * Returns the list of Pipelines managed by SCM. + */ + rpc listPipelines(ListPipelineRequestProto) + returns (ListPipelineResponseProto); + + /** + * Closes a pipeline. + */ + rpc closePipeline(ClosePipelineRequestProto) + returns (ClosePipelineResponseProto); + /** * Returns information about SCM. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index cce09f3dd69..47a6eacb5c7 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -41,6 +41,8 @@ public interface PipelineManager extends Closeable { Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException; + List getPipelines(); + List getPipelines(ReplicationType type); List getPipelines(ReplicationType type, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java index 9f95378882b..1edb23a4f3f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java @@ -56,6 +56,10 @@ class PipelineStateManager { return pipelineStateMap.getPipeline(pipelineID); } + public List getPipelines() { + return pipelineStateMap.getPipelines(); + } + List getPipelines(ReplicationType type) { return pipelineStateMap.getPipelines(type); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java index 20dfa03594e..8a0ffbb6591 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java @@ -114,6 +114,14 @@ class PipelineStateMap { return pipeline; } + /** + * Get list of pipelines in SCM. + * @return List of pipelines + */ + public List getPipelines() { + return new ArrayList<>(pipelineMap.values()); + } + /** * Get pipeline corresponding to specified replication type. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java index dd799625134..6cf3abe30c6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java @@ -77,7 +77,7 @@ public final class RatisPipelineUtils { * @param ozoneConf - Ozone configuration * @throws IOException */ - static void destroyPipeline(PipelineManager pipelineManager, + public static void destroyPipeline(PipelineManager pipelineManager, Pipeline pipeline, Configuration ozoneConf) throws IOException { final RaftGroup group = RatisHelper.newRaftGroup(pipeline); LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index cf1955dfe38..382483f87a8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -146,6 +146,16 @@ public class SCMPipelineManager implements PipelineManager { } } + @Override + public List getPipelines() { + lock.readLock().lock(); + try { + return stateManager.getPipelines(); + } finally { + lock.readLock().unlock(); + } + } + @Override public List getPipelines(ReplicationType type) { lock.readLock().lock(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index b59042e75a0..d80d6e215d1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -42,6 +42,9 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.hdds.server.events.EventHandler; @@ -306,6 +309,21 @@ public class SCMClientProtocolServer implements return null; } + @Override + public List listPipelines() { + return scm.getPipelineManager().getPipelines(); + } + + @Override + public void closePipeline(HddsProtos.PipelineID pipelineID) + throws IOException { + PipelineManager pipelineManager = scm.getPipelineManager(); + Pipeline pipeline = + pipelineManager.getPipeline(PipelineID.getFromProtobuf(pipelineID)); + RatisPipelineUtils + .finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false); + } + @Override public ScmInfo getScmInfo() throws IOException { ScmInfo.Builder builder = diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java index 31a36fb54a0..e385f309c0c 100644 --- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hdds.scm.cli.container.CreateSubcommand; import org.apache.hadoop.hdds.scm.cli.container.DeleteSubcommand; import org.apache.hadoop.hdds.scm.cli.container.InfoSubcommand; import org.apache.hadoop.hdds.scm.cli.container.ListSubcommand; +import org.apache.hadoop.hdds.scm.cli.pipeline.ClosePipelineSubcommand; +import org.apache.hadoop.hdds.scm.cli.pipeline.ListPipelinesSubcommand; import org.apache.hadoop.hdds.scm.client.ContainerOperationClient; import org.apache.hadoop.hdds.scm.client.ScmClient; import org.apache.hadoop.hdds.scm.container.ContainerInfo; @@ -78,7 +80,9 @@ import picocli.CommandLine.Option; InfoSubcommand.class, DeleteSubcommand.class, CreateSubcommand.class, - CloseSubcommand.class + CloseSubcommand.class, + ListPipelinesSubcommand.class, + ClosePipelineSubcommand.class }, mixinStandardHelpOptions = true) public class SCMCLI extends GenericCli { diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ClosePipelineSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ClosePipelineSubcommand.java new file mode 100644 index 00000000000..d99823b2363 --- /dev/null +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ClosePipelineSubcommand.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.cli.pipeline; + +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.cli.SCMCLI; +import org.apache.hadoop.hdds.scm.client.ScmClient; +import picocli.CommandLine; + +import java.util.concurrent.Callable; + +/** + * Handler of closePipeline command. + */ +@CommandLine.Command( + name = "closePipeline", + description = "Close pipeline", + mixinStandardHelpOptions = true, + versionProvider = HddsVersionProvider.class) +public class ClosePipelineSubcommand implements Callable { + + @CommandLine.ParentCommand + private SCMCLI parent; + + @CommandLine.Parameters(description = "ID of the pipeline to close") + private String pipelineId; + + @Override + public Void call() throws Exception { + try (ScmClient scmClient = parent.createScmClient()) { + scmClient.closePipeline( + HddsProtos.PipelineID.newBuilder().setId(pipelineId).build()); + return null; + } + } +} \ No newline at end of file diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ListPipelinesSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ListPipelinesSubcommand.java new file mode 100644 index 00000000000..0f8cf282068 --- /dev/null +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ListPipelinesSubcommand.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.cli.pipeline; + +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.scm.cli.SCMCLI; +import org.apache.hadoop.hdds.scm.client.ScmClient; +import picocli.CommandLine; + +import java.util.concurrent.Callable; + +/** + * Handler of listPipelines command. + */ +@CommandLine.Command( + name = "listPipelines", + description = "List all active pipelines", + mixinStandardHelpOptions = true, + versionProvider = HddsVersionProvider.class) +public class ListPipelinesSubcommand implements Callable { + + @CommandLine.ParentCommand + private SCMCLI parent; + + @Override + public Void call() throws Exception { + try (ScmClient scmClient = parent.createScmClient()) { + scmClient.listPipelines().forEach(System.out::println); + return null; + } + } +} \ No newline at end of file diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/package-info.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/package-info.java new file mode 100644 index 00000000000..64924d1219f --- /dev/null +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains all of the pipeline related scm commands. + */ +package org.apache.hadoop.hdds.scm.cli.pipeline; \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java index d404b844b43..823cd7de793 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java @@ -100,6 +100,9 @@ public class TestPipelineStateManager { @Test public void testGetPipelines() throws IOException { + // In start there should be no pipelines + Assert.assertTrue(stateManager.getPipelines().isEmpty()); + Set pipelines = new HashSet<>(); Pipeline pipeline = createDummyPipeline(1); stateManager.addPipeline(pipeline); @@ -113,6 +116,10 @@ public class TestPipelineStateManager { Set pipelines1 = new HashSet<>(stateManager.getPipelines( HddsProtos.ReplicationType.RATIS)); Assert.assertEquals(pipelines1.size(), pipelines.size()); + + pipelines1 = new HashSet<>(stateManager.getPipelines()); + Assert.assertEquals(pipelines1.size(), pipelines.size()); + // clean up for (Pipeline pipeline1 : pipelines) { removePipeline(pipeline1);