diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java index e2856d74ac4..c97354f5a64 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java @@ -226,6 +226,18 @@ public class ContainerOperationClient implements ScmClient { return storageContainerLocationClient.listPipelines(); } + @Override + public void activatePipeline(HddsProtos.PipelineID pipelineID) + throws IOException { + storageContainerLocationClient.activatePipeline(pipelineID); + } + + @Override + public void deactivatePipeline(HddsProtos.PipelineID pipelineID) + throws IOException { + storageContainerLocationClient.deactivatePipeline(pipelineID); + } + @Override public void closePipeline(HddsProtos.PipelineID pipelineID) throws IOException { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java index c2dd5f94965..226ceda9255 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java @@ -180,6 +180,22 @@ public interface ScmClient extends Closeable { */ List listPipelines() throws IOException; + /** + * Activates the pipeline given a pipeline ID. + * + * @param pipelineID PipelineID to activate. + * @throws IOException In case of exception while activating the pipeline + */ + void activatePipeline(HddsProtos.PipelineID pipelineID) throws IOException; + + /** + * Deactivates the pipeline given a pipeline ID. + * + * @param pipelineID PipelineID to deactivate. + * @throws IOException In case of exception while deactivating the pipeline + */ + void deactivatePipeline(HddsProtos.PipelineID pipelineID) throws IOException; + /** * Closes the pipeline given a pipeline ID. * diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java index a84118a0f97..1627569b1a0 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java @@ -354,7 +354,7 @@ public final class Pipeline { * Possible Pipeline states in SCM. */ public enum PipelineState { - ALLOCATED, OPEN, CLOSED; + ALLOCATED, OPEN, DORMANT, CLOSED; public static PipelineState fromProtobuf(HddsProtos.PipelineState state) throws UnknownPipelineStateException { @@ -362,6 +362,7 @@ public final class Pipeline { switch (state) { case PIPELINE_ALLOCATED: return ALLOCATED; case PIPELINE_OPEN: return OPEN; + case PIPELINE_DORMANT: return DORMANT; case PIPELINE_CLOSED: return CLOSED; default: throw new UnknownPipelineStateException( @@ -375,6 +376,7 @@ public final class Pipeline { switch (state) { case ALLOCATED: return HddsProtos.PipelineState.PIPELINE_ALLOCATED; case OPEN: return HddsProtos.PipelineState.PIPELINE_OPEN; + case DORMANT: return HddsProtos.PipelineState.PIPELINE_DORMANT; case CLOSED: return HddsProtos.PipelineState.PIPELINE_CLOSED; default: throw new UnknownPipelineStateException( diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index 565ce4729b4..88db8205a40 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@ -146,6 +146,22 @@ public interface StorageContainerLocationProtocol extends Closeable { */ List listPipelines() throws IOException; + /** + * Activates a dormant pipeline. + * + * @param pipelineID ID of the pipeline to activate. + * @throws IOException in case of any Exception + */ + void activatePipeline(HddsProtos.PipelineID pipelineID) throws IOException; + + /** + * Deactivates an active pipeline. + * + * @param pipelineID ID of the pipeline to deactivate. + * @throws IOException in case of any Exception + */ + void deactivatePipeline(HddsProtos.PipelineID pipelineID) throws IOException; + /** * Closes a pipeline given the pipelineID. * diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java index 9e316f7ba10..ab3fcd185ff 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java @@ -20,6 +20,8 @@ import com.google.common.base.Preconditions; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto; @@ -339,6 +341,36 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB } } + @Override + public void activatePipeline(HddsProtos.PipelineID pipelineID) + throws IOException { + try { + ActivatePipelineRequestProto request = + ActivatePipelineRequestProto.newBuilder() + .setTraceID(TracingUtil.exportCurrentSpan()) + .setPipelineID(pipelineID) + .build(); + rpcProxy.activatePipeline(NULL_RPC_CONTROLLER, request); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public void deactivatePipeline(HddsProtos.PipelineID pipelineID) + throws IOException { + try { + DeactivatePipelineRequestProto request = + DeactivatePipelineRequestProto.newBuilder() + .setTraceID(TracingUtil.exportCurrentSpan()) + .setPipelineID(pipelineID) + .build(); + rpcProxy.deactivatePipeline(NULL_RPC_CONTROLLER, request); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + @Override public void closePipeline(HddsProtos.PipelineID pipelineID) throws IOException { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java index b3b4879f75e..d03ad157220 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java @@ -33,6 +33,8 @@ public enum SCMAction implements AuditAction { LIST_CONTAINER, LIST_PIPELINE, CLOSE_PIPELINE, + ACTIVATE_PIPELINE, + DEACTIVATE_PIPELINE, DELETE_CONTAINER, IN_SAFE_MODE, FORCE_EXIT_SAFE_MODE, diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java index 30ef7ea888c..99c9e8d7c31 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java @@ -48,6 +48,14 @@ import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerLocationProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ActivatePipelineResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerLocationProtocolProtos.ContainerRequestProto; import org.apache.hadoop.hdds.protocol.proto @@ -257,6 +265,32 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB } } + @Override + public ActivatePipelineResponseProto activatePipeline( + RpcController controller, ActivatePipelineRequestProto request) + throws ServiceException { + try (Scope ignored = TracingUtil + .importAndCreateScope("activatePipeline", request.getTraceID())) { + impl.activatePipeline(request.getPipelineID()); + return ActivatePipelineResponseProto.newBuilder().build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public DeactivatePipelineResponseProto deactivatePipeline( + RpcController controller, DeactivatePipelineRequestProto request) + throws ServiceException { + try (Scope ignored = TracingUtil + .importAndCreateScope("deactivatePipeline", request.getTraceID())) { + impl.deactivatePipeline(request.getPipelineID()); + return DeactivatePipelineResponseProto.newBuilder().build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + @Override public ClosePipelineResponseProto closePipeline( RpcController controller, ClosePipelineRequestProto request) diff --git a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto index 4e4b50bea31..0c358760360 100644 --- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto @@ -167,6 +167,22 @@ message ListPipelineResponseProto { repeated Pipeline pipelines = 1; } +message ActivatePipelineRequestProto { + required PipelineID pipelineID = 1; + optional string traceID = 2; +} + +message ActivatePipelineResponseProto { +} + +message DeactivatePipelineRequestProto { + required PipelineID pipelineID = 1; + optional string traceID = 2; +} + +message DeactivatePipelineResponseProto { +} + message ClosePipelineRequestProto { required PipelineID pipelineID = 1; optional string traceID = 2; @@ -274,6 +290,12 @@ service StorageContainerLocationProtocolService { rpc listPipelines(ListPipelineRequestProto) returns (ListPipelineResponseProto); + rpc activatePipeline(ActivatePipelineRequestProto) + returns (ActivatePipelineResponseProto); + + rpc deactivatePipeline(DeactivatePipelineRequestProto) + returns (DeactivatePipelineResponseProto); + /** * Closes a pipeline. */ diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index 6475f4c901c..d2bb355ff8a 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -62,7 +62,8 @@ message PipelineID { enum PipelineState { PIPELINE_ALLOCATED = 1; PIPELINE_OPEN = 2; - PIPELINE_CLOSED = 3; + PIPELINE_DORMANT = 3; + PIPELINE_CLOSED = 4; } message Pipeline { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 70bd64c8a48..bd8fa2d9238 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -77,4 +77,21 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean { void triggerPipelineCreation(); void incNumBlocksAllocatedMetric(PipelineID id); + + /** + * Activates a dormant pipeline. + * + * @param pipelineID ID of the pipeline to activate. + * @throws IOException in case of any Exception + */ + void activatePipeline(PipelineID pipelineID) throws IOException; + + /** + * Deactivates an active pipeline. + * + * @param pipelineID ID of the pipeline to deactivate. + * @throws IOException in case of any Exception + */ + void deactivatePipeline(PipelineID pipelineID) throws IOException; + } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java index 6be747b7e85..76150579f84 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java @@ -137,4 +137,28 @@ class PipelineStateManager { } return pipeline; } + + /** + * Activates a dormant pipeline. + * + * @param pipelineID ID of the pipeline to activate. + * @throws IOException in case of any Exception + */ + public void activatePipeline(PipelineID pipelineID) + throws IOException { + pipelineStateMap + .updatePipelineState(pipelineID, PipelineState.OPEN); + } + + /** + * Deactivates an active pipeline. + * + * @param pipelineID ID of the pipeline to deactivate. + * @throws IOException in case of any Exception + */ + public void deactivatePipeline(PipelineID pipelineID) + throws IOException { + pipelineStateMap + .updatePipelineState(pipelineID, PipelineState.DORMANT); + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java index 6fc27a6cdb5..443378cd183 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java @@ -374,7 +374,7 @@ class PipelineStateMap { if (updatedPipeline.getPipelineState() == PipelineState.OPEN) { // for transition to OPEN state add pipeline to query2OpenPipelines query2OpenPipelines.get(query).add(updatedPipeline); - } else if (updatedPipeline.getPipelineState() == PipelineState.CLOSED) { + } else { // for transition from OPEN to CLOSED state remove pipeline from // query2OpenPipelines query2OpenPipelines.get(query).remove(pipeline); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java index 14fde0785a4..9e227331d66 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java @@ -134,6 +134,7 @@ public class RatisPipelineProvider implements PipelineProvider { Set dnsUsed = new HashSet<>(); stateManager.getPipelines(ReplicationType.RATIS, factor).stream().filter( p -> p.getPipelineState().equals(PipelineState.OPEN) || + p.getPipelineState().equals(PipelineState.DORMANT) || p.getPipelineState().equals(PipelineState.ALLOCATED)) .forEach(p -> dnsUsed.addAll(p.getNodes())); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 6660f47dbfa..d6457f342fc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.pipeline; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -116,6 +117,7 @@ public class SCMPipelineManager implements PipelineManager { return stateManager; } + @VisibleForTesting public void setPipelineProvider(ReplicationType replicationType, PipelineProvider provider) { pipelineFactory.setProvider(replicationType, provider); @@ -349,6 +351,30 @@ public class SCMPipelineManager implements PipelineManager { backgroundPipelineCreator.triggerPipelineCreation(); } + /** + * Activates a dormant pipeline. + * + * @param pipelineID ID of the pipeline to activate. + * @throws IOException in case of any Exception + */ + @Override + public void activatePipeline(PipelineID pipelineID) + throws IOException { + stateManager.activatePipeline(pipelineID); + } + + /** + * Deactivates an active pipeline. + * + * @param pipelineID ID of the pipeline to deactivate. + * @throws IOException in case of any Exception + */ + @Override + public void deactivatePipeline(PipelineID pipelineID) + throws IOException { + stateManager.deactivatePipeline(pipelineID); + } + /** * Moves the pipeline to CLOSED state and sends close container command for * all the containers in the pipeline. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index bf75fef4f52..7d9cb3e2464 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -403,6 +403,24 @@ public class SCMClientProtocolServer implements return scm.getPipelineManager().getPipelines(); } + @Override + public void activatePipeline(HddsProtos.PipelineID pipelineID) + throws IOException { + AUDIT.logReadSuccess(buildAuditMessageForSuccess( + SCMAction.ACTIVATE_PIPELINE, null)); + scm.getPipelineManager().activatePipeline( + PipelineID.getFromProtobuf(pipelineID)); + } + + @Override + public void deactivatePipeline(HddsProtos.PipelineID pipelineID) + throws IOException { + AUDIT.logReadSuccess(buildAuditMessageForSuccess( + SCMAction.DEACTIVATE_PIPELINE, null)); + scm.getPipelineManager().deactivatePipeline( + PipelineID.getFromProtobuf(pipelineID)); + } + @Override public void closePipeline(HddsProtos.PipelineID pipelineID) throws IOException { diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java index 3e8f3fa1b58..1b95418338c 100644 --- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/SCMCLI.java @@ -32,7 +32,9 @@ import org.apache.hadoop.hdds.scm.cli.container.CreateSubcommand; import org.apache.hadoop.hdds.scm.cli.container.DeleteSubcommand; import org.apache.hadoop.hdds.scm.cli.container.InfoSubcommand; import org.apache.hadoop.hdds.scm.cli.container.ListSubcommand; +import org.apache.hadoop.hdds.scm.cli.pipeline.ActivatePipelineSubcommand; import org.apache.hadoop.hdds.scm.cli.pipeline.ClosePipelineSubcommand; +import org.apache.hadoop.hdds.scm.cli.pipeline.DeactivatePipelineSubcommand; import org.apache.hadoop.hdds.scm.cli.pipeline.ListPipelinesSubcommand; import org.apache.hadoop.hdds.scm.client.ContainerOperationClient; import org.apache.hadoop.hdds.scm.client.ScmClient; @@ -84,6 +86,8 @@ import picocli.CommandLine.Option; CreateSubcommand.class, CloseSubcommand.class, ListPipelinesSubcommand.class, + ActivatePipelineSubcommand.class, + DeactivatePipelineSubcommand.class, ClosePipelineSubcommand.class, TopologySubcommand.class, ReplicationManagerCommands.class diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ActivatePipelineSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ActivatePipelineSubcommand.java new file mode 100644 index 00000000000..d8f71383000 --- /dev/null +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/ActivatePipelineSubcommand.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.cli.pipeline; + +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.cli.SCMCLI; +import org.apache.hadoop.hdds.scm.client.ScmClient; +import picocli.CommandLine; + +import java.util.concurrent.Callable; + +/** + * Handler of activatePipeline command. + */ +@CommandLine.Command( + name = "activatePipeline", + description = "Activates the given Pipeline", + mixinStandardHelpOptions = true, + versionProvider = HddsVersionProvider.class) +public class ActivatePipelineSubcommand implements Callable { + + @CommandLine.ParentCommand + private SCMCLI parent; + + @CommandLine.Parameters(description = "ID of the pipeline to activate") + private String pipelineId; + + @Override + public Void call() throws Exception { + try (ScmClient scmClient = parent.createScmClient()) { + scmClient.activatePipeline( + HddsProtos.PipelineID.newBuilder().setId(pipelineId).build()); + return null; + } + } +} \ No newline at end of file diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/DeactivatePipelineSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/DeactivatePipelineSubcommand.java new file mode 100644 index 00000000000..67342d0b8dd --- /dev/null +++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/DeactivatePipelineSubcommand.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.cli.pipeline; + +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.cli.SCMCLI; +import org.apache.hadoop.hdds.scm.client.ScmClient; +import picocli.CommandLine; + +import java.util.concurrent.Callable; + +/** + * Handler of deactivatePipeline command. + */ +@CommandLine.Command( + name = "deactivatePipeline", + description = "Deactivates the given Pipeline", + mixinStandardHelpOptions = true, + versionProvider = HddsVersionProvider.class) +public class DeactivatePipelineSubcommand implements Callable { + + @CommandLine.ParentCommand + private SCMCLI parent; + + @CommandLine.Parameters(description = "ID of the pipeline to deactivate") + private String pipelineId; + + @Override + public Void call() throws Exception { + try (ScmClient scmClient = parent.createScmClient()) { + scmClient.deactivatePipeline( + HddsProtos.PipelineID.newBuilder().setId(pipelineId).build()); + return null; + } + } +} \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java index 33dd7df9181..0bbfb5312f3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java @@ -246,6 +246,13 @@ public class TestPipelineStateManager { stateManager.openPipeline(pipeline.getId()); pipelines.add(pipeline); + // 5 pipelines in dormant state for each type and factor + pipeline = createDummyPipeline(type, factor, factor.getNumber()); + stateManager.addPipeline(pipeline); + stateManager.openPipeline(pipeline.getId()); + stateManager.deactivatePipeline(pipeline.getId()); + pipelines.add(pipeline); + // 5 pipelines in closed state for each type and factor pipeline = createDummyPipeline(type, factor, factor.getNumber()); stateManager.addPipeline(pipeline); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index e1b86534f68..eebaa7d2f07 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -35,9 +35,9 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.Pipeline import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.test.GenericTestUtils; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; import java.io.File; @@ -55,8 +55,8 @@ public class TestSCMPipelineManager { private static File testDir; private static Configuration conf; - @BeforeClass - public static void setUp() throws Exception { + @Before + public void setUp() throws Exception { conf = new OzoneConfiguration(); testDir = GenericTestUtils .getTestDir(TestSCMPipelineManager.class.getSimpleName()); @@ -68,8 +68,8 @@ public class TestSCMPipelineManager { nodeManager = new MockNodeManager(true, 20); } - @AfterClass - public static void cleanup() throws IOException { + @After + public void cleanup() { FileUtil.fullyDelete(testDir); } @@ -269,4 +269,50 @@ public class TestSCMPipelineManager { "NumPipelineCreationFailed", metrics); Assert.assertTrue(numPipelineCreateFailed == 0); } + + @Test + public void testActivateDeactivatePipeline() throws IOException { + final SCMPipelineManager pipelineManager = + new SCMPipelineManager(conf, nodeManager, new EventQueue()); + final PipelineProvider mockRatisProvider = + new MockRatisPipelineProvider(nodeManager, + pipelineManager.getStateManager(), conf); + + pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, + mockRatisProvider); + + final Pipeline pipeline = pipelineManager + .createPipeline(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + final PipelineID pid = pipeline.getId(); + + pipelineManager.openPipeline(pid); + pipelineManager.addContainerToPipeline(pid, ContainerID.valueof(1)); + + Assert.assertTrue(pipelineManager + .getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, + Pipeline.PipelineState.OPEN).contains(pipeline)); + + Assert.assertEquals(Pipeline.PipelineState.OPEN, + pipelineManager.getPipeline(pid).getPipelineState()); + + pipelineManager.deactivatePipeline(pid); + Assert.assertEquals(Pipeline.PipelineState.DORMANT, + pipelineManager.getPipeline(pid).getPipelineState()); + + Assert.assertFalse(pipelineManager + .getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, + Pipeline.PipelineState.OPEN).contains(pipeline)); + + pipelineManager.activatePipeline(pid); + + Assert.assertTrue(pipelineManager + .getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE, + Pipeline.PipelineState.OPEN).contains(pipeline)); + + pipelineManager.close(); + } }