HDDS-718. Introduce new SCM Commands to list and close Pipelines. Contributed by Lokesh Jain.

This commit is contained in:
Mukul Kumar Singh 2018-11-19 22:58:25 +05:30
parent e7438a1b38
commit b5d7b292c9
18 changed files with 331 additions and 13 deletions

View File

@ -224,6 +224,17 @@ public class ContainerOperationClient implements ScmClient {
factor, nodePool);
}
@Override
public List<Pipeline> listPipelines() throws IOException {
return storageContainerLocationClient.listPipelines();
}
@Override
public void closePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
storageContainerLocationClient.closePipeline(pipelineID);
}
@Override
public void close() {
try {

View File

@ -172,6 +172,22 @@ public interface ScmClient extends Closeable {
HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool)
throws IOException;
/**
* Returns the list of active Pipelines.
*
* @return list of Pipeline
* @throws IOException in case of any exception
*/
List<Pipeline> listPipelines() throws IOException;
/**
* Closes the pipeline given a pipeline ID.
*
* @param pipelineID PipelineID to close.
* @throws IOException In case of exception while closing the pipeline
*/
void closePipeline(HddsProtos.PipelineID pipelineID) throws IOException;
/**
* Check if SCM is in chill mode.
*

View File

@ -188,6 +188,20 @@ public final class Pipeline {
.toHashCode();
}
@Override
public String toString() {
final StringBuilder b =
new StringBuilder(getClass().getSimpleName()).append("[");
b.append(" Id: ").append(id.getId());
b.append(", Nodes: ");
nodeStatus.keySet().forEach(b::append);
b.append(", Type:").append(getType());
b.append(", Factor:").append(getFactor());
b.append(", State:").append(getPipelineState());
b.append("]");
return b.toString();
}
public static Builder newBuilder() {
return new Builder();
}
@ -196,17 +210,6 @@ public final class Pipeline {
return new Builder(pipeline);
}
@Override
public String toString() {
return "Pipeline{" +
"id=" + id +
", type=" + type +
", factor=" + factor +
", state=" + state +
", nodeStatus=" + nodeStatus +
'}';
}
/**
* Builder class for Pipeline.
*/

View File

@ -126,6 +126,23 @@ public interface StorageContainerLocationProtocol {
HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool)
throws IOException;
/**
* Returns the list of active Pipelines.
*
* @return list of Pipeline
*
* @throws IOException in case of any exception
*/
List<Pipeline> listPipelines() throws IOException;
/**
* Closes a pipeline given the pipelineID.
*
* @param pipelineID ID of the pipeline to demolish
* @throws IOException
*/
void closePipeline(HddsProtos.PipelineID pipelineID) throws IOException;
/**
* Returns information about SCM.
*

View File

@ -20,6 +20,9 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitChillModeRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitChillModeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
@ -64,6 +67,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* This class is the client-side translator to translate the requests made on
@ -304,6 +308,35 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
}
}
@Override
public List<Pipeline> listPipelines() throws IOException {
try {
ListPipelineRequestProto request = ListPipelineRequestProto
.newBuilder().build();
ListPipelineResponseProto response = rpcProxy.listPipelines(
NULL_RPC_CONTROLLER, request);
return response.getPipelinesList().stream()
.map(Pipeline::getFromProtobuf)
.collect(Collectors.toList());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void closePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
try {
ClosePipelineRequestProto request =
ClosePipelineRequestProto.newBuilder()
.setPipelineID(pipelineID)
.build();
rpcProxy.closePipeline(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public ScmInfo getScmInfo() throws IOException {
HddsProtos.GetScmInfoRequestProto request =

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@ -43,6 +44,14 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ClosePipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto
@ -211,6 +220,34 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
return null;
}
@Override
public ListPipelineResponseProto listPipelines(
RpcController controller, ListPipelineRequestProto request)
throws ServiceException {
try {
ListPipelineResponseProto.Builder builder = ListPipelineResponseProto
.newBuilder();
List<Pipeline> pipelineIDs = impl.listPipelines();
pipelineIDs.stream().map(Pipeline::getProtobufMessage)
.forEach(builder::addPipelines);
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public ClosePipelineResponseProto closePipeline(
RpcController controller, ClosePipelineRequestProto request)
throws ServiceException {
try {
impl.closePipeline(request.getPipelineID());
return ClosePipelineResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public HddsProtos.GetScmInfoRespsonseProto getScmInfo(
RpcController controller, HddsProtos.GetScmInfoRequestProto req)

View File

@ -149,6 +149,19 @@ message PipelineResponseProto {
optional string errorMessage = 3;
}
message ListPipelineRequestProto {
}
message ListPipelineResponseProto {
repeated Pipeline pipelines = 1;
}
message ClosePipelineRequestProto {
required PipelineID pipelineID = 1;
}
message ClosePipelineResponseProto {
}
message InChillModeRequestProto {
}
@ -218,6 +231,18 @@ service StorageContainerLocationProtocolService {
rpc allocatePipeline(PipelineRequestProto)
returns (PipelineResponseProto);
/**
* Returns the list of Pipelines managed by SCM.
*/
rpc listPipelines(ListPipelineRequestProto)
returns (ListPipelineResponseProto);
/**
* Closes a pipeline.
*/
rpc closePipeline(ClosePipelineRequestProto)
returns (ClosePipelineResponseProto);
/**
* Returns information about SCM.
*/

View File

@ -41,6 +41,8 @@ public interface PipelineManager extends Closeable {
Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException;
List<Pipeline> getPipelines();
List<Pipeline> getPipelines(ReplicationType type);
List<Pipeline> getPipelines(ReplicationType type,

View File

@ -56,6 +56,10 @@ class PipelineStateManager {
return pipelineStateMap.getPipeline(pipelineID);
}
public List<Pipeline> getPipelines() {
return pipelineStateMap.getPipelines();
}
List<Pipeline> getPipelines(ReplicationType type) {
return pipelineStateMap.getPipelines(type);
}

View File

@ -114,6 +114,14 @@ class PipelineStateMap {
return pipeline;
}
/**
* Get list of pipelines in SCM.
* @return List of pipelines
*/
public List<Pipeline> getPipelines() {
return new ArrayList<>(pipelineMap.values());
}
/**
* Get pipeline corresponding to specified replication type.
*

View File

@ -77,7 +77,7 @@ public final class RatisPipelineUtils {
* @param ozoneConf - Ozone configuration
* @throws IOException
*/
static void destroyPipeline(PipelineManager pipelineManager,
public static void destroyPipeline(PipelineManager pipelineManager,
Pipeline pipeline, Configuration ozoneConf) throws IOException {
final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);

View File

@ -146,6 +146,16 @@ public class SCMPipelineManager implements PipelineManager {
}
}
@Override
public List<Pipeline> getPipelines() {
lock.readLock().lock();
try {
return stateManager.getPipelines();
} finally {
lock.readLock().unlock();
}
}
@Override
public List<Pipeline> getPipelines(ReplicationType type) {
lock.readLock().lock();

View File

@ -42,6 +42,9 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineUtils;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.hdds.server.events.EventHandler;
@ -306,6 +309,21 @@ public class SCMClientProtocolServer implements
return null;
}
@Override
public List<Pipeline> listPipelines() {
return scm.getPipelineManager().getPipelines();
}
@Override
public void closePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
PipelineManager pipelineManager = scm.getPipelineManager();
Pipeline pipeline =
pipelineManager.getPipeline(PipelineID.getFromProtobuf(pipelineID));
RatisPipelineUtils
.finalizeAndDestroyPipeline(pipelineManager, pipeline, conf, false);
}
@Override
public ScmInfo getScmInfo() throws IOException {
ScmInfo.Builder builder =

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.hdds.scm.cli.container.CreateSubcommand;
import org.apache.hadoop.hdds.scm.cli.container.DeleteSubcommand;
import org.apache.hadoop.hdds.scm.cli.container.InfoSubcommand;
import org.apache.hadoop.hdds.scm.cli.container.ListSubcommand;
import org.apache.hadoop.hdds.scm.cli.pipeline.ClosePipelineSubcommand;
import org.apache.hadoop.hdds.scm.cli.pipeline.ListPipelinesSubcommand;
import org.apache.hadoop.hdds.scm.client.ContainerOperationClient;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@ -78,7 +80,9 @@ import picocli.CommandLine.Option;
InfoSubcommand.class,
DeleteSubcommand.class,
CreateSubcommand.class,
CloseSubcommand.class
CloseSubcommand.class,
ListPipelinesSubcommand.class,
ClosePipelineSubcommand.class
},
mixinStandardHelpOptions = true)
public class SCMCLI extends GenericCli {

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.cli.pipeline;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.cli.SCMCLI;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import picocli.CommandLine;
import java.util.concurrent.Callable;
/**
* Handler of closePipeline command.
*/
@CommandLine.Command(
name = "closePipeline",
description = "Close pipeline",
mixinStandardHelpOptions = true,
versionProvider = HddsVersionProvider.class)
public class ClosePipelineSubcommand implements Callable<Void> {
@CommandLine.ParentCommand
private SCMCLI parent;
@CommandLine.Parameters(description = "ID of the pipeline to close")
private String pipelineId;
@Override
public Void call() throws Exception {
try (ScmClient scmClient = parent.createScmClient()) {
scmClient.closePipeline(
HddsProtos.PipelineID.newBuilder().setId(pipelineId).build());
return null;
}
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.cli.pipeline;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.scm.cli.SCMCLI;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import picocli.CommandLine;
import java.util.concurrent.Callable;
/**
* Handler of listPipelines command.
*/
@CommandLine.Command(
name = "listPipelines",
description = "List all active pipelines",
mixinStandardHelpOptions = true,
versionProvider = HddsVersionProvider.class)
public class ListPipelinesSubcommand implements Callable<Void> {
@CommandLine.ParentCommand
private SCMCLI parent;
@Override
public Void call() throws Exception {
try (ScmClient scmClient = parent.createScmClient()) {
scmClient.listPipelines().forEach(System.out::println);
return null;
}
}
}

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Contains all of the pipeline related scm commands.
*/
package org.apache.hadoop.hdds.scm.cli.pipeline;

View File

@ -100,6 +100,9 @@ public class TestPipelineStateManager {
@Test
public void testGetPipelines() throws IOException {
// In start there should be no pipelines
Assert.assertTrue(stateManager.getPipelines().isEmpty());
Set<Pipeline> pipelines = new HashSet<>();
Pipeline pipeline = createDummyPipeline(1);
stateManager.addPipeline(pipeline);
@ -113,6 +116,10 @@ public class TestPipelineStateManager {
Set<Pipeline> pipelines1 = new HashSet<>(stateManager.getPipelines(
HddsProtos.ReplicationType.RATIS));
Assert.assertEquals(pipelines1.size(), pipelines.size());
pipelines1 = new HashSet<>(stateManager.getPipelines());
Assert.assertEquals(pipelines1.size(), pipelines.size());
// clean up
for (Pipeline pipeline1 : pipelines) {
removePipeline(pipeline1);