From 75fc51588de33c7d1cf890f870114fd68f32fb74 Mon Sep 17 00:00:00 2001 From: Xiaoyu Yao Date: Tue, 14 Aug 2018 14:57:46 -0700 Subject: [PATCH] HDDS-298. Implement SCMClientProtocolServer.getContainerWithPipeline for closed containers. Contributed by Ajay Kumar. --- .../hdds/scm/container/ContainerMapping.java | 26 +++++++--- .../container/states/ContainerStateMap.java | 2 +- .../hdds/scm/exceptions/SCMException.java | 3 +- .../scm/container/TestContainerMapping.java | 50 +++++++++++++++++++ 4 files changed, 72 insertions(+), 9 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java index e12fcad2f4f..e8392f578ce 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -58,6 +58,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -201,14 +202,25 @@ public class ContainerMapping implements Mapping { HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER .parseFrom(containerBytes); contInfo = ContainerInfo.fromProtobuf(temp); - Pipeline pipeline = pipelineSelector - .getPipeline(contInfo.getPipelineID(), - contInfo.getReplicationType()); - if(pipeline == null) { - pipeline = pipelineSelector - .getReplicationPipeline(contInfo.getReplicationType(), - contInfo.getReplicationFactor()); + Pipeline pipeline; + if (contInfo.isContainerOpen()) { + // If pipeline with given pipeline Id already exist return it + pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID(), + contInfo.getReplicationType()); + if (pipeline == null) { + pipeline = pipelineSelector + .getReplicationPipeline(contInfo.getReplicationType(), + contInfo.getReplicationFactor()); + } + } else { + // For close containers create pipeline from datanodes with replicas + Set dnWithReplicas = containerStateManager + .getContainerReplicas(contInfo.containerID()); + pipeline = new Pipeline(dnWithReplicas.iterator().next().getHostName(), + contInfo.getState(), ReplicationType.STAND_ALONE, + contInfo.getReplicationFactor(), PipelineID.randomId()); + dnWithReplicas.forEach(pipeline::addMember); } return new ContainerWithPipeline(contInfo, pipeline); } finally { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java index f840b277492..6c6ce65c7b5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java @@ -199,7 +199,7 @@ public class ContainerStateMap { } throw new SCMException( "No entry exist for containerId: " + containerID + " in replica map.", - ResultCodes.FAILED_TO_FIND_CONTAINER); + ResultCodes.NO_REPLICA_FOUND); } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java index 00855426eaa..87a29e3a5dc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java @@ -117,6 +117,7 @@ public class SCMException extends IOException { UNEXPECTED_CONTAINER_STATE, SCM_NOT_INITIALIZED, DUPLICATE_DATANODE, - NO_SUCH_DATANODE + NO_SUCH_DATANODE, + NO_REPLICA_FOUND } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java index 62695149e97..2dc7e9960a4 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerMapping.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdds.scm.container; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.XceiverClientManager; @@ -30,10 +32,12 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.SCMTestUtils; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -149,6 +153,52 @@ public class TestContainerMapping { newPipeline.getLeader().getUuid()); } + @Test + public void testGetContainerWithPipeline() throws Exception { + ContainerWithPipeline containerWithPipeline = mapping.allocateContainer( + xceiverClientManager.getType(), + xceiverClientManager.getFactor(), + containerOwner); + ContainerInfo contInfo = containerWithPipeline.getContainerInfo(); + // Add dummy replicas for container. + DatanodeDetails dn1 = DatanodeDetails.newBuilder() + .setHostName("host1") + .setIpAddress("1.1.1.1") + .setUuid(UUID.randomUUID().toString()).build(); + DatanodeDetails dn2 = DatanodeDetails.newBuilder() + .setHostName("host2") + .setIpAddress("2.2.2.2") + .setUuid(UUID.randomUUID().toString()).build(); + mapping + .updateContainerState(contInfo.getContainerID(), LifeCycleEvent.CREATE); + mapping.updateContainerState(contInfo.getContainerID(), + LifeCycleEvent.CREATED); + mapping.updateContainerState(contInfo.getContainerID(), + LifeCycleEvent.FINALIZE); + mapping + .updateContainerState(contInfo.getContainerID(), LifeCycleEvent.CLOSE); + ContainerInfo finalContInfo = contInfo; + LambdaTestUtils.intercept(SCMException.class,"No entry exist for " + + "containerId:" , () -> mapping.getContainerWithPipeline( + finalContInfo.getContainerID())); + + mapping.getStateManager().getContainerStateMap() + .addContainerReplica(contInfo.containerID(), dn1, dn2); + + contInfo = mapping.getContainer(contInfo.getContainerID()); + Assert.assertEquals(contInfo.getState(), LifeCycleState.CLOSED); + Pipeline pipeline = containerWithPipeline.getPipeline(); + mapping.getPipelineSelector().finalizePipeline(pipeline); + + ContainerWithPipeline containerWithPipeline2 = mapping + .getContainerWithPipeline(contInfo.getContainerID()); + pipeline = containerWithPipeline2.getPipeline(); + Assert.assertNotEquals(containerWithPipeline, containerWithPipeline2); + Assert.assertNotNull("Pipeline should not be null", pipeline); + Assert.assertTrue(pipeline.getDatanodeHosts().contains(dn1.getHostName())); + Assert.assertTrue(pipeline.getDatanodeHosts().contains(dn2.getHostName())); + } + @Test public void testgetNoneExistentContainer() throws IOException { thrown.expectMessage("Specified key does not exist.");