HDDS-695. Introduce new SCM Commands to list and close Pipelines.
Contributed by Nanda kumar.
This commit is contained in:
parent
0674f11fc0
commit
346afb0a5c
@ -254,6 +254,17 @@ public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
|
||||
factor, nodePool);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Pipeline> listPipelines() throws IOException {
|
||||
return storageContainerLocationClient.listPipelines();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closePipeline(HddsProtos.PipelineID pipelineID)
|
||||
throws IOException {
|
||||
storageContainerLocationClient.closePipeline(pipelineID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
try {
|
||||
|
@ -171,4 +171,22 @@ List<HddsProtos.Node> queryNode(HddsProtos.NodeState nodeStatuses,
|
||||
Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
|
||||
HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Returns the list of active PipelineIDs.
|
||||
*
|
||||
* @return list of PipelineID
|
||||
*
|
||||
* @throws IOException in case of any exception
|
||||
*/
|
||||
List<Pipeline> listPipelines() throws IOException;
|
||||
|
||||
/**
|
||||
* Closes the pipeline given a pipeline ID.
|
||||
*
|
||||
* @param pipelineID PipelineID to close.
|
||||
*
|
||||
* @throws IOException In case of exception while closing the pipeline
|
||||
*/
|
||||
void closePipeline(HddsProtos.PipelineID pipelineID) throws IOException;
|
||||
}
|
||||
|
@ -284,18 +284,20 @@ public HddsProtos.ReplicationType getType() {
|
||||
public String toString() {
|
||||
final StringBuilder b = new StringBuilder(getClass().getSimpleName())
|
||||
.append("[");
|
||||
getDatanodes().keySet().forEach(
|
||||
node -> b.append(node.endsWith(getLeaderID()) ? "*" + id : id));
|
||||
b.append(" id:").append(id);
|
||||
b.append(" Id: ").append(id.getId());
|
||||
b.append(", Nodes: ");
|
||||
getDatanodes().values().forEach(b::append);
|
||||
|
||||
if (getType() != null) {
|
||||
b.append(" type:").append(getType().toString());
|
||||
b.append(", Type:").append(getType().toString());
|
||||
}
|
||||
if (getFactor() != null) {
|
||||
b.append(" factor:").append(getFactor().toString());
|
||||
b.append(", Factor:").append(getFactor().toString());
|
||||
}
|
||||
if (getLifeCycleState() != null) {
|
||||
b.append(" State:").append(getLifeCycleState().toString());
|
||||
b.append(", State:").append(getLifeCycleState().toString());
|
||||
}
|
||||
b.append("]");
|
||||
return b.toString();
|
||||
}
|
||||
|
||||
|
@ -126,6 +126,23 @@ Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
|
||||
HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Returns the list of active PipelineIDs.
|
||||
*
|
||||
* @return list of PipelineID
|
||||
*
|
||||
* @throws IOException in case of any exception
|
||||
*/
|
||||
List<Pipeline> listPipelines() throws IOException;
|
||||
|
||||
/**
|
||||
* Closes a pipeline given the pipelineID.
|
||||
*
|
||||
* @param pipelineID ID of the pipeline to demolish
|
||||
* @throws IOException
|
||||
*/
|
||||
void closePipeline(HddsProtos.PipelineID pipelineID) throws IOException;
|
||||
|
||||
/**
|
||||
* Returns information about SCM.
|
||||
*
|
||||
|
@ -20,6 +20,9 @@
|
||||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitChillModeRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitChillModeResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
|
||||
@ -64,6 +67,7 @@
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* This class is the client-side translator to translate the requests made on
|
||||
@ -304,6 +308,35 @@ public Pipeline createReplicationPipeline(HddsProtos.ReplicationType
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Pipeline> listPipelines() throws IOException {
|
||||
try {
|
||||
ListPipelineRequestProto request = ListPipelineRequestProto
|
||||
.newBuilder().build();
|
||||
ListPipelineResponseProto response = rpcProxy.listPipelines(
|
||||
NULL_RPC_CONTROLLER, request);
|
||||
return response.getPipelinesList().stream()
|
||||
.map(Pipeline::getFromProtoBuf)
|
||||
.collect(Collectors.toList());
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closePipeline(HddsProtos.PipelineID pipelineID)
|
||||
throws IOException {
|
||||
try {
|
||||
ClosePipelineRequestProto request =
|
||||
ClosePipelineRequestProto.newBuilder()
|
||||
.setPipelineID(pipelineID)
|
||||
.build();
|
||||
rpcProxy.closePipeline(NULL_RPC_CONTROLLER, request);
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScmInfo getScmInfo() throws IOException {
|
||||
HddsProtos.GetScmInfoRequestProto request =
|
||||
|
@ -34,6 +34,7 @@
|
||||
import org.apache.hadoop.hdds.scm.ScmInfo;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
|
||||
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
@ -43,6 +44,14 @@
|
||||
.StorageContainerLocationProtocolProtos.ContainerRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.ContainerResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.ClosePipelineResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
@ -211,6 +220,34 @@ public PipelineResponseProto allocatePipeline(
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListPipelineResponseProto listPipelines(
|
||||
RpcController controller, ListPipelineRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
ListPipelineResponseProto.Builder builder = ListPipelineResponseProto
|
||||
.newBuilder();
|
||||
List<Pipeline> pipelineIDs = impl.listPipelines();
|
||||
pipelineIDs.stream().map(Pipeline::getProtobufMessage)
|
||||
.forEach(builder::addPipelines);
|
||||
return builder.build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClosePipelineResponseProto closePipeline(
|
||||
RpcController controller, ClosePipelineRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
impl.closePipeline(request.getPipelineID());
|
||||
return ClosePipelineResponseProto.newBuilder().build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public HddsProtos.GetScmInfoRespsonseProto getScmInfo(
|
||||
RpcController controller, HddsProtos.GetScmInfoRequestProto req)
|
||||
|
@ -149,6 +149,19 @@ message PipelineResponseProto {
|
||||
optional string errorMessage = 3;
|
||||
}
|
||||
|
||||
message ListPipelineRequestProto {
|
||||
}
|
||||
|
||||
message ListPipelineResponseProto {
|
||||
repeated Pipeline pipelines = 1;
|
||||
}
|
||||
|
||||
message ClosePipelineRequestProto {
|
||||
required PipelineID pipelineID = 1;
|
||||
}
|
||||
|
||||
message ClosePipelineResponseProto {
|
||||
}
|
||||
|
||||
message InChillModeRequestProto {
|
||||
}
|
||||
@ -218,6 +231,18 @@ service StorageContainerLocationProtocolService {
|
||||
rpc allocatePipeline(PipelineRequestProto)
|
||||
returns (PipelineResponseProto);
|
||||
|
||||
/**
|
||||
* Returns the list of Pipelines managed by SCM.
|
||||
*/
|
||||
rpc listPipelines(ListPipelineRequestProto)
|
||||
returns (ListPipelineResponseProto);
|
||||
|
||||
/**
|
||||
* Closes a pipeline.
|
||||
*/
|
||||
rpc closePipeline(ClosePipelineRequestProto)
|
||||
returns (ClosePipelineResponseProto);
|
||||
|
||||
/**
|
||||
* Returns information about SCM.
|
||||
*/
|
||||
|
@ -54,6 +54,8 @@
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.HashMap;
|
||||
@ -167,9 +169,15 @@ public void addContainerToPipeline(PipelineID pipelineID, long containerID) {
|
||||
|
||||
public void removeContainerFromPipeline(PipelineID pipelineID,
|
||||
long containerID) throws IOException {
|
||||
pipeline2ContainerMap.get(pipelineID)
|
||||
.remove(ContainerID.valueof(containerID));
|
||||
closePipelineIfNoOpenContainers(pipelineMap.get(pipelineID));
|
||||
if (pipeline2ContainerMap.containsKey(pipelineID)) {
|
||||
pipeline2ContainerMap.get(pipelineID)
|
||||
.remove(ContainerID.valueof(containerID));
|
||||
closePipelineIfNoOpenContainers(pipelineMap.get(pipelineID));
|
||||
} else {
|
||||
LOG.warn("Cannot remove container #{} from pipeline." +
|
||||
" Pipeline #{} not found.",
|
||||
containerID, pipelineID.getId());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -341,6 +349,31 @@ private void closePipeline(Pipeline pipeline) throws IOException {
|
||||
manager.closePipeline(pipeline);
|
||||
}
|
||||
|
||||
public List<Pipeline> listPipelines() {
|
||||
return Collections.unmodifiableList(new ArrayList<>(pipelineMap.values()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the given pipeline.
|
||||
*/
|
||||
public void closePipeline(PipelineID pipelineID) throws IOException {
|
||||
final Pipeline pipeline = pipelineMap.get(pipelineID);
|
||||
if (pipeline == null) {
|
||||
// pipeline not found;
|
||||
LOG.warn("Cannot close the pipeline. {} not found!", pipelineID);
|
||||
return;
|
||||
}
|
||||
LOG.info("Closing pipeline. pipelineID: {}", pipeline.getId());
|
||||
finalizePipeline(pipeline);
|
||||
if (pipeline.getLifeCycleState() != LifeCycleState.CLOSED) {
|
||||
pipelineManagerMap.get(pipeline.getType()).closePipeline(pipeline);
|
||||
pipeline2ContainerMap.remove(pipeline.getId());
|
||||
nodeManager.removePipeline(pipeline);
|
||||
pipelineMap.remove(pipeline.getId());
|
||||
}
|
||||
pipelineStore.delete(pipelineID.getProtobuf().toByteArray());
|
||||
}
|
||||
|
||||
/**
|
||||
* Add to a given pipeline.
|
||||
*/
|
||||
|
@ -36,6 +36,7 @@
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
|
||||
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
|
||||
@ -58,6 +59,7 @@
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerLocationProtocolProtos
|
||||
@ -292,6 +294,18 @@ public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Pipeline> listPipelines() {
|
||||
return scm.getContainerManager().getPipelineSelector().listPipelines();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void closePipeline(HddsProtos.PipelineID pipelineID)
|
||||
throws IOException {
|
||||
PipelineID id = PipelineID.valueOf(UUID.fromString(pipelineID.getId()));
|
||||
scm.getContainerManager().getPipelineSelector().closePipeline(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScmInfo getScmInfo() throws IOException {
|
||||
ScmInfo.Builder builder =
|
||||
|
@ -0,0 +1,186 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdds.scm.pipelines;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.PipelineReport;
|
||||
import org.apache.hadoop.hdds.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.hdds.scm.TestUtils;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
|
||||
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
||||
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Tests the functionality of PipelineSelector.
|
||||
*/
|
||||
public class TestPipelineSelector {
|
||||
|
||||
@Test
|
||||
public void testListPipelinesWithNoPipeline() throws IOException {
|
||||
String storageDir = GenericTestUtils.getTempPath(
|
||||
TestPipelineSelector.class.getName() + UUID.randomUUID());
|
||||
try {
|
||||
Configuration conf = new OzoneConfiguration();
|
||||
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, storageDir);
|
||||
PipelineSelector selector = new PipelineSelector(
|
||||
Mockito.mock(NodeManager.class), conf,
|
||||
Mockito.mock(EventPublisher.class),
|
||||
ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
|
||||
Assert.assertTrue(selector.listPipelines().isEmpty());
|
||||
} finally {
|
||||
FileUtil.fullyDelete(new File(storageDir));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListPipelines() throws IOException {
|
||||
String storageDir = GenericTestUtils.getTempPath(
|
||||
TestPipelineSelector.class.getName() + UUID.randomUUID());
|
||||
try {
|
||||
Configuration conf = new OzoneConfiguration();
|
||||
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, storageDir);
|
||||
PipelineSelector selector = new PipelineSelector(
|
||||
Mockito.mock(NodeManager.class), conf,
|
||||
Mockito.mock(EventPublisher.class),
|
||||
ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
|
||||
getRandomPipeline(selector);
|
||||
getRandomPipeline(selector);
|
||||
getRandomPipeline(selector);
|
||||
getRandomPipeline(selector);
|
||||
getRandomPipeline(selector);
|
||||
Assert.assertEquals(5, selector.listPipelines().size());
|
||||
} finally {
|
||||
FileUtil.fullyDelete(new File(storageDir));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseEmptyPipeline() throws IOException {
|
||||
String storageDir = GenericTestUtils.getTempPath(
|
||||
TestPipelineSelector.class.getName() + UUID.randomUUID());
|
||||
try {
|
||||
Configuration conf = new OzoneConfiguration();
|
||||
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, storageDir);
|
||||
PipelineSelector selector = new PipelineSelector(
|
||||
Mockito.mock(NodeManager.class), conf,
|
||||
Mockito.mock(EventPublisher.class),
|
||||
ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
|
||||
|
||||
// Create and add pipeline to selector.
|
||||
Pipeline pipelineOne = getRandomPipeline(selector);
|
||||
Pipeline pipelineTwo = getRandomPipeline(selector);
|
||||
|
||||
Assert.assertNotNull(selector.getPipeline(pipelineOne.getId()));
|
||||
Assert.assertNotNull(selector.getPipeline(pipelineTwo.getId()));
|
||||
|
||||
selector.closePipeline(pipelineOne.getId());
|
||||
|
||||
Assert.assertNull(selector.getPipeline(pipelineOne.getId()));
|
||||
Assert.assertNotNull(selector.getPipeline(pipelineTwo.getId()));
|
||||
} finally {
|
||||
FileUtil.fullyDelete(new File(storageDir));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClosePipelineWithContainer() throws IOException {
|
||||
String storageDir = GenericTestUtils.getTempPath(
|
||||
TestPipelineSelector.class.getName() + UUID.randomUUID());
|
||||
try {
|
||||
Configuration conf = new OzoneConfiguration();
|
||||
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, storageDir);
|
||||
PipelineSelector selector = new PipelineSelector(
|
||||
Mockito.mock(NodeManager.class), conf,
|
||||
Mockito.mock(EventPublisher.class),
|
||||
ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
|
||||
|
||||
// Create and add pipeline to selector.
|
||||
Pipeline pipelineOne = getRandomPipeline(selector);
|
||||
Pipeline pipelineTwo = getRandomPipeline(selector);
|
||||
|
||||
selector.addContainerToPipeline(pipelineOne.getId(), 1L);
|
||||
selector.addContainerToPipeline(pipelineOne.getId(), 2L);
|
||||
selector.addContainerToPipeline(pipelineOne.getId(), 3L);
|
||||
selector.addContainerToPipeline(pipelineOne.getId(), 4L);
|
||||
selector.addContainerToPipeline(pipelineOne.getId(), 5L);
|
||||
|
||||
Assert.assertNotNull(selector.getPipeline(pipelineOne.getId()));
|
||||
Assert.assertNotNull(selector.getPipeline(pipelineTwo.getId()));
|
||||
|
||||
Assert.assertEquals(5,
|
||||
selector.getOpenContainerIDsByPipeline(pipelineOne.getId()).size());
|
||||
|
||||
selector.closePipeline(pipelineOne.getId());
|
||||
|
||||
Assert.assertNull(selector.getPipeline(pipelineOne.getId()));
|
||||
Assert.assertNotNull(selector.getPipeline(pipelineTwo.getId()));
|
||||
} finally {
|
||||
FileUtil.fullyDelete(new File(storageDir));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a random pipeline and registers with PipelineSelector.
|
||||
*
|
||||
* @param selector PipelineSelector
|
||||
* @return Pipeline
|
||||
* @throws IOException
|
||||
*/
|
||||
private Pipeline getRandomPipeline(PipelineSelector selector)
|
||||
throws IOException{
|
||||
DatanodeDetails ddOne = TestUtils.randomDatanodeDetails();
|
||||
DatanodeDetails ddTwo = TestUtils.randomDatanodeDetails();
|
||||
DatanodeDetails ddThree = TestUtils.randomDatanodeDetails();
|
||||
Pipeline pipeline = new Pipeline(ddOne.getUuidString(),
|
||||
LifeCycleState.ALLOCATED, ReplicationType.RATIS,
|
||||
ReplicationFactor.THREE, PipelineID.randomId());
|
||||
pipeline.addMember(ddOne);
|
||||
pipeline.addMember(ddTwo);
|
||||
pipeline.addMember(ddThree);
|
||||
selector.updatePipelineState(pipeline, LifeCycleEvent.CREATE);
|
||||
selector.updatePipelineState(pipeline, LifeCycleEvent.CREATED);
|
||||
PipelineReport reportOne = PipelineReport.newBuilder()
|
||||
.setPipelineID(pipeline.getId().getProtobuf()).build();
|
||||
PipelineReportsProto reportsOne = PipelineReportsProto.newBuilder()
|
||||
.addPipelineReport(reportOne).build();
|
||||
pipeline.getDatanodes().values().forEach(
|
||||
dd -> selector.processPipelineReport(dd, reportsOne));
|
||||
return pipeline;
|
||||
}
|
||||
|
||||
}
|
@ -32,6 +32,8 @@
|
||||
import org.apache.hadoop.hdds.scm.cli.container.DeleteSubcommand;
|
||||
import org.apache.hadoop.hdds.scm.cli.container.InfoSubcommand;
|
||||
import org.apache.hadoop.hdds.scm.cli.container.ListSubcommand;
|
||||
import org.apache.hadoop.hdds.scm.cli.pipeline.ClosePipelineSubcommand;
|
||||
import org.apache.hadoop.hdds.scm.cli.pipeline.ListPipelinesSubcommand;
|
||||
import org.apache.hadoop.hdds.scm.client.ContainerOperationClient;
|
||||
import org.apache.hadoop.hdds.scm.client.ScmClient;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
|
||||
@ -77,7 +79,9 @@
|
||||
InfoSubcommand.class,
|
||||
DeleteSubcommand.class,
|
||||
CreateSubcommand.class,
|
||||
CloseSubcommand.class
|
||||
CloseSubcommand.class,
|
||||
ListPipelinesSubcommand.class,
|
||||
ClosePipelineSubcommand.class
|
||||
},
|
||||
mixinStandardHelpOptions = true)
|
||||
public class SCMCLI extends GenericCli {
|
||||
|
@ -0,0 +1,53 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdds.scm.cli.pipeline;
|
||||
|
||||
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.scm.cli.SCMCLI;
|
||||
import org.apache.hadoop.hdds.scm.client.ScmClient;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
* Handler of closePipeline command.
|
||||
*/
|
||||
@CommandLine.Command(
|
||||
name = "closePipeline",
|
||||
description = "Close pipeline",
|
||||
mixinStandardHelpOptions = true,
|
||||
versionProvider = HddsVersionProvider.class)
|
||||
public class ClosePipelineSubcommand implements Callable<Void> {
|
||||
|
||||
@CommandLine.ParentCommand
|
||||
private SCMCLI parent;
|
||||
|
||||
@CommandLine.Parameters(description = "ID of the pipeline to close")
|
||||
private String pipelineId;
|
||||
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
try (ScmClient scmClient = parent.createScmClient()) {
|
||||
scmClient.closePipeline(
|
||||
HddsProtos.PipelineID.newBuilder().setId(pipelineId).build());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,48 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdds.scm.cli.pipeline;
|
||||
|
||||
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
|
||||
import org.apache.hadoop.hdds.scm.cli.SCMCLI;
|
||||
import org.apache.hadoop.hdds.scm.client.ScmClient;
|
||||
import picocli.CommandLine;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
* Handler of listPipelines command.
|
||||
*/
|
||||
@CommandLine.Command(
|
||||
name = "listPipelines",
|
||||
description = "List all active pipelines",
|
||||
mixinStandardHelpOptions = true,
|
||||
versionProvider = HddsVersionProvider.class)
|
||||
public class ListPipelinesSubcommand implements Callable<Void> {
|
||||
|
||||
@CommandLine.ParentCommand
|
||||
private SCMCLI parent;
|
||||
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
try (ScmClient scmClient = parent.createScmClient()) {
|
||||
scmClient.listPipelines().forEach(System.out::println);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Contains all of the pipeline related scm commands.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.scm.cli.pipeline;
|
Loading…
x
Reference in New Issue
Block a user