HDDS-1810. SCM command to Activate and Deactivate pipelines. (#1224)

This commit is contained in:
Nanda kumar 2019-09-03 16:50:57 +05:30
parent b53d19a343
commit 0b9704f610
No known key found for this signature in database
GPG Key ID: CE6C8AB1204780DF
20 changed files with 395 additions and 9 deletions

View File

@ -226,6 +226,18 @@ public class ContainerOperationClient implements ScmClient {
return storageContainerLocationClient.listPipelines();
}
@Override
public void activatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
storageContainerLocationClient.activatePipeline(pipelineID);
}
@Override
public void deactivatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
storageContainerLocationClient.deactivatePipeline(pipelineID);
}
@Override
public void closePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {

View File

@ -180,6 +180,22 @@ public interface ScmClient extends Closeable {
*/
List<Pipeline> listPipelines() throws IOException;
/**
* Activates the pipeline given a pipeline ID.
*
* @param pipelineID PipelineID to activate.
* @throws IOException In case of exception while activating the pipeline
*/
void activatePipeline(HddsProtos.PipelineID pipelineID) throws IOException;
/**
* Deactivates the pipeline given a pipeline ID.
*
* @param pipelineID PipelineID to deactivate.
* @throws IOException In case of exception while deactivating the pipeline
*/
void deactivatePipeline(HddsProtos.PipelineID pipelineID) throws IOException;
/**
* Closes the pipeline given a pipeline ID.
*

View File

@ -354,7 +354,7 @@ public final class Pipeline {
* Possible Pipeline states in SCM.
*/
public enum PipelineState {
ALLOCATED, OPEN, CLOSED;
ALLOCATED, OPEN, DORMANT, CLOSED;
public static PipelineState fromProtobuf(HddsProtos.PipelineState state)
throws UnknownPipelineStateException {
@ -362,6 +362,7 @@ public final class Pipeline {
switch (state) {
case PIPELINE_ALLOCATED: return ALLOCATED;
case PIPELINE_OPEN: return OPEN;
case PIPELINE_DORMANT: return DORMANT;
case PIPELINE_CLOSED: return CLOSED;
default:
throw new UnknownPipelineStateException(
@ -375,6 +376,7 @@ public final class Pipeline {
switch (state) {
case ALLOCATED: return HddsProtos.PipelineState.PIPELINE_ALLOCATED;
case OPEN: return HddsProtos.PipelineState.PIPELINE_OPEN;
case DORMANT: return HddsProtos.PipelineState.PIPELINE_DORMANT;
case CLOSED: return HddsProtos.PipelineState.PIPELINE_CLOSED;
default:
throw new UnknownPipelineStateException(

View File

@ -146,6 +146,22 @@ public interface StorageContainerLocationProtocol extends Closeable {
*/
List<Pipeline> listPipelines() throws IOException;
/**
* Activates a dormant pipeline.
*
* @param pipelineID ID of the pipeline to activate.
* @throws IOException in case of any Exception
*/
void activatePipeline(HddsProtos.PipelineID pipelineID) throws IOException;
/**
* Deactivates an active pipeline.
*
* @param pipelineID ID of the pipeline to deactivate.
* @throws IOException in case of any Exception
*/
void deactivatePipeline(HddsProtos.PipelineID pipelineID) throws IOException;
/**
* Closes a pipeline given the pipelineID.
*

View File

@ -20,6 +20,8 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto;
@ -339,6 +341,36 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
}
}
@Override
public void activatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
try {
ActivatePipelineRequestProto request =
ActivatePipelineRequestProto.newBuilder()
.setTraceID(TracingUtil.exportCurrentSpan())
.setPipelineID(pipelineID)
.build();
rpcProxy.activatePipeline(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void deactivatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
try {
DeactivatePipelineRequestProto request =
DeactivatePipelineRequestProto.newBuilder()
.setTraceID(TracingUtil.exportCurrentSpan())
.setPipelineID(pipelineID)
.build();
rpcProxy.deactivatePipeline(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void closePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {

View File

@ -33,6 +33,8 @@ public enum SCMAction implements AuditAction {
LIST_CONTAINER,
LIST_PIPELINE,
CLOSE_PIPELINE,
ACTIVATE_PIPELINE,
DEACTIVATE_PIPELINE,
DELETE_CONTAINER,
IN_SAFE_MODE,
FORCE_EXIT_SAFE_MODE,

View File

@ -48,6 +48,14 @@ import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ActivatePipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto
@ -257,6 +265,32 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
}
}
@Override
public ActivatePipelineResponseProto activatePipeline(
RpcController controller, ActivatePipelineRequestProto request)
throws ServiceException {
try (Scope ignored = TracingUtil
.importAndCreateScope("activatePipeline", request.getTraceID())) {
impl.activatePipeline(request.getPipelineID());
return ActivatePipelineResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public DeactivatePipelineResponseProto deactivatePipeline(
RpcController controller, DeactivatePipelineRequestProto request)
throws ServiceException {
try (Scope ignored = TracingUtil
.importAndCreateScope("deactivatePipeline", request.getTraceID())) {
impl.deactivatePipeline(request.getPipelineID());
return DeactivatePipelineResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public ClosePipelineResponseProto closePipeline(
RpcController controller, ClosePipelineRequestProto request)

View File

@ -167,6 +167,22 @@ message ListPipelineResponseProto {
repeated Pipeline pipelines = 1;
}
message ActivatePipelineRequestProto {
required PipelineID pipelineID = 1;
optional string traceID = 2;
}
message ActivatePipelineResponseProto {
}
message DeactivatePipelineRequestProto {
required PipelineID pipelineID = 1;
optional string traceID = 2;
}
message DeactivatePipelineResponseProto {
}
message ClosePipelineRequestProto {
required PipelineID pipelineID = 1;
optional string traceID = 2;
@ -274,6 +290,12 @@ service StorageContainerLocationProtocolService {
rpc listPipelines(ListPipelineRequestProto)
returns (ListPipelineResponseProto);
rpc activatePipeline(ActivatePipelineRequestProto)
returns (ActivatePipelineResponseProto);
rpc deactivatePipeline(DeactivatePipelineRequestProto)
returns (DeactivatePipelineResponseProto);
/**
* Closes a pipeline.
*/

View File

@ -62,7 +62,8 @@ message PipelineID {
enum PipelineState {
PIPELINE_ALLOCATED = 1;
PIPELINE_OPEN = 2;
PIPELINE_CLOSED = 3;
PIPELINE_DORMANT = 3;
PIPELINE_CLOSED = 4;
}
message Pipeline {

View File

@ -77,4 +77,21 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
void triggerPipelineCreation();
void incNumBlocksAllocatedMetric(PipelineID id);
/**
* Activates a dormant pipeline.
*
* @param pipelineID ID of the pipeline to activate.
* @throws IOException in case of any Exception
*/
void activatePipeline(PipelineID pipelineID) throws IOException;
/**
* Deactivates an active pipeline.
*
* @param pipelineID ID of the pipeline to deactivate.
* @throws IOException in case of any Exception
*/
void deactivatePipeline(PipelineID pipelineID) throws IOException;
}

View File

@ -137,4 +137,28 @@ class PipelineStateManager {
}
return pipeline;
}
/**
* Activates a dormant pipeline.
*
* @param pipelineID ID of the pipeline to activate.
* @throws IOException in case of any Exception
*/
public void activatePipeline(PipelineID pipelineID)
throws IOException {
pipelineStateMap
.updatePipelineState(pipelineID, PipelineState.OPEN);
}
/**
* Deactivates an active pipeline.
*
* @param pipelineID ID of the pipeline to deactivate.
* @throws IOException in case of any Exception
*/
public void deactivatePipeline(PipelineID pipelineID)
throws IOException {
pipelineStateMap
.updatePipelineState(pipelineID, PipelineState.DORMANT);
}
}

View File

@ -374,7 +374,7 @@ class PipelineStateMap {
if (updatedPipeline.getPipelineState() == PipelineState.OPEN) {
// for transition to OPEN state add pipeline to query2OpenPipelines
query2OpenPipelines.get(query).add(updatedPipeline);
} else if (updatedPipeline.getPipelineState() == PipelineState.CLOSED) {
} else {
// for transition from OPEN to CLOSED state remove pipeline from
// query2OpenPipelines
query2OpenPipelines.get(query).remove(pipeline);

View File

@ -134,6 +134,7 @@ public class RatisPipelineProvider implements PipelineProvider {
Set<DatanodeDetails> dnsUsed = new HashSet<>();
stateManager.getPipelines(ReplicationType.RATIS, factor).stream().filter(
p -> p.getPipelineState().equals(PipelineState.OPEN) ||
p.getPipelineState().equals(PipelineState.DORMANT) ||
p.getPipelineState().equals(PipelineState.ALLOCATED))
.forEach(p -> dnsUsed.addAll(p.getNodes()));

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.pipeline;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@ -116,6 +117,7 @@ public class SCMPipelineManager implements PipelineManager {
return stateManager;
}
@VisibleForTesting
public void setPipelineProvider(ReplicationType replicationType,
PipelineProvider provider) {
pipelineFactory.setProvider(replicationType, provider);
@ -349,6 +351,30 @@ public class SCMPipelineManager implements PipelineManager {
backgroundPipelineCreator.triggerPipelineCreation();
}
/**
* Activates a dormant pipeline.
*
* @param pipelineID ID of the pipeline to activate.
* @throws IOException in case of any Exception
*/
@Override
public void activatePipeline(PipelineID pipelineID)
throws IOException {
stateManager.activatePipeline(pipelineID);
}
/**
* Deactivates an active pipeline.
*
* @param pipelineID ID of the pipeline to deactivate.
* @throws IOException in case of any Exception
*/
@Override
public void deactivatePipeline(PipelineID pipelineID)
throws IOException {
stateManager.deactivatePipeline(pipelineID);
}
/**
* Moves the pipeline to CLOSED state and sends close container command for
* all the containers in the pipeline.

View File

@ -403,6 +403,24 @@ public class SCMClientProtocolServer implements
return scm.getPipelineManager().getPipelines();
}
@Override
public void activatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
AUDIT.logReadSuccess(buildAuditMessageForSuccess(
SCMAction.ACTIVATE_PIPELINE, null));
scm.getPipelineManager().activatePipeline(
PipelineID.getFromProtobuf(pipelineID));
}
@Override
public void deactivatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
AUDIT.logReadSuccess(buildAuditMessageForSuccess(
SCMAction.DEACTIVATE_PIPELINE, null));
scm.getPipelineManager().deactivatePipeline(
PipelineID.getFromProtobuf(pipelineID));
}
@Override
public void closePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {

View File

@ -32,7 +32,9 @@ import org.apache.hadoop.hdds.scm.cli.container.CreateSubcommand;
import org.apache.hadoop.hdds.scm.cli.container.DeleteSubcommand;
import org.apache.hadoop.hdds.scm.cli.container.InfoSubcommand;
import org.apache.hadoop.hdds.scm.cli.container.ListSubcommand;
import org.apache.hadoop.hdds.scm.cli.pipeline.ActivatePipelineSubcommand;
import org.apache.hadoop.hdds.scm.cli.pipeline.ClosePipelineSubcommand;
import org.apache.hadoop.hdds.scm.cli.pipeline.DeactivatePipelineSubcommand;
import org.apache.hadoop.hdds.scm.cli.pipeline.ListPipelinesSubcommand;
import org.apache.hadoop.hdds.scm.client.ContainerOperationClient;
import org.apache.hadoop.hdds.scm.client.ScmClient;
@ -84,6 +86,8 @@ import picocli.CommandLine.Option;
CreateSubcommand.class,
CloseSubcommand.class,
ListPipelinesSubcommand.class,
ActivatePipelineSubcommand.class,
DeactivatePipelineSubcommand.class,
ClosePipelineSubcommand.class,
TopologySubcommand.class,
ReplicationManagerCommands.class

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.cli.pipeline;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.cli.SCMCLI;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import picocli.CommandLine;
import java.util.concurrent.Callable;
/**
* Handler of activatePipeline command.
*/
@CommandLine.Command(
name = "activatePipeline",
description = "Activates the given Pipeline",
mixinStandardHelpOptions = true,
versionProvider = HddsVersionProvider.class)
public class ActivatePipelineSubcommand implements Callable<Void> {
@CommandLine.ParentCommand
private SCMCLI parent;
@CommandLine.Parameters(description = "ID of the pipeline to activate")
private String pipelineId;
@Override
public Void call() throws Exception {
try (ScmClient scmClient = parent.createScmClient()) {
scmClient.activatePipeline(
HddsProtos.PipelineID.newBuilder().setId(pipelineId).build());
return null;
}
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.cli.pipeline;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.cli.SCMCLI;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import picocli.CommandLine;
import java.util.concurrent.Callable;
/**
* Handler of deactivatePipeline command.
*/
@CommandLine.Command(
name = "deactivatePipeline",
description = "Deactivates the given Pipeline",
mixinStandardHelpOptions = true,
versionProvider = HddsVersionProvider.class)
public class DeactivatePipelineSubcommand implements Callable<Void> {
@CommandLine.ParentCommand
private SCMCLI parent;
@CommandLine.Parameters(description = "ID of the pipeline to deactivate")
private String pipelineId;
@Override
public Void call() throws Exception {
try (ScmClient scmClient = parent.createScmClient()) {
scmClient.deactivatePipeline(
HddsProtos.PipelineID.newBuilder().setId(pipelineId).build());
return null;
}
}
}

View File

@ -246,6 +246,13 @@ public class TestPipelineStateManager {
stateManager.openPipeline(pipeline.getId());
pipelines.add(pipeline);
// 5 pipelines in dormant state for each type and factor
pipeline = createDummyPipeline(type, factor, factor.getNumber());
stateManager.addPipeline(pipeline);
stateManager.openPipeline(pipeline.getId());
stateManager.deactivatePipeline(pipeline.getId());
pipelines.add(pipeline);
// 5 pipelines in closed state for each type and factor
pipeline = createDummyPipeline(type, factor, factor.getNumber());
stateManager.addPipeline(pipeline);

View File

@ -35,9 +35,9 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.Pipeline
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
@ -55,8 +55,8 @@ public class TestSCMPipelineManager {
private static File testDir;
private static Configuration conf;
@BeforeClass
public static void setUp() throws Exception {
@Before
public void setUp() throws Exception {
conf = new OzoneConfiguration();
testDir = GenericTestUtils
.getTestDir(TestSCMPipelineManager.class.getSimpleName());
@ -68,8 +68,8 @@ public class TestSCMPipelineManager {
nodeManager = new MockNodeManager(true, 20);
}
@AfterClass
public static void cleanup() throws IOException {
@After
public void cleanup() {
FileUtil.fullyDelete(testDir);
}
@ -269,4 +269,50 @@ public class TestSCMPipelineManager {
"NumPipelineCreationFailed", metrics);
Assert.assertTrue(numPipelineCreateFailed == 0);
}
@Test
public void testActivateDeactivatePipeline() throws IOException {
final SCMPipelineManager pipelineManager =
new SCMPipelineManager(conf, nodeManager, new EventQueue());
final PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
final Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
final PipelineID pid = pipeline.getId();
pipelineManager.openPipeline(pid);
pipelineManager.addContainerToPipeline(pid, ContainerID.valueof(1));
Assert.assertTrue(pipelineManager
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE,
Pipeline.PipelineState.OPEN).contains(pipeline));
Assert.assertEquals(Pipeline.PipelineState.OPEN,
pipelineManager.getPipeline(pid).getPipelineState());
pipelineManager.deactivatePipeline(pid);
Assert.assertEquals(Pipeline.PipelineState.DORMANT,
pipelineManager.getPipeline(pid).getPipelineState());
Assert.assertFalse(pipelineManager
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE,
Pipeline.PipelineState.OPEN).contains(pipeline));
pipelineManager.activatePipeline(pid);
Assert.assertTrue(pipelineManager
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE,
Pipeline.PipelineState.OPEN).contains(pipeline));
pipelineManager.close();
}
}