diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java index 9d258128da2..b34b35203df 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java @@ -79,8 +79,6 @@ public class CloseContainerCommandHandler implements CommandHandler { final ContainerController controller = ozoneContainer.getController(); final long containerId = closeCommand.getContainerID(); try { - // TODO: Closing of QUASI_CLOSED container. - final Container container = controller.getContainer(containerId); if (container == null) { @@ -95,6 +93,11 @@ public class CloseContainerCommandHandler implements CommandHandler { // If the container is part of open pipeline, close it via write channel if (ozoneContainer.getWriteChannel() .isExist(closeCommand.getPipelineID())) { + if (closeCommand.getForce()) { + LOG.warn("Cannot force close a container when the container is" + + " part of an active pipeline."); + return; + } ContainerCommandRequestProto request = getContainerCommandRequestProto(datanodeDetails, closeCommand.getContainerID()); @@ -102,10 +105,14 @@ public class CloseContainerCommandHandler implements CommandHandler { request, closeCommand.getPipelineID()); return; } - - // The container is not part of any open pipeline. - // QUASI_CLOSE the container using ContainerController. - controller.quasiCloseContainer(containerId); + // If we reach here, there is no active pipeline for this container. + if (!closeCommand.getForce()) { + // QUASI_CLOSE the container. + controller.quasiCloseContainer(containerId); + } else { + // SCM told us to force close the container. + controller.closeContainer(containerId); + } } catch (NotLeaderException e) { LOG.debug("Follower cannot close container #{}.", containerId); } catch (IOException e) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java index 4fe6ae464f5..96c22acf511 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java @@ -30,11 +30,18 @@ public class CloseContainerCommand extends SCMCommand { private final PipelineID pipelineID; + private boolean force; public CloseContainerCommand(final long containerID, final PipelineID pipelineID) { + this(containerID, pipelineID, false); + } + + public CloseContainerCommand(final long containerID, + final PipelineID pipelineID, boolean force) { super(containerID); this.pipelineID = pipelineID; + this.force = force; } /** @@ -62,6 +69,7 @@ public class CloseContainerCommand .setContainerID(getId()) .setCmdId(getId()) .setPipelineID(pipelineID.getProtobuf()) + .setForce(force) .build(); } @@ -69,7 +77,8 @@ public class CloseContainerCommand CloseContainerCommandProto closeContainerProto) { Preconditions.checkNotNull(closeContainerProto); return new CloseContainerCommand(closeContainerProto.getCmdId(), - PipelineID.getFromProtobuf(closeContainerProto.getPipelineID())); + PipelineID.getFromProtobuf(closeContainerProto.getPipelineID()), + closeContainerProto.getForce()); } public long getContainerID() { diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index c5f709cb7b1..050ba9bdc9a 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -293,6 +293,8 @@ message CloseContainerCommandProto { required PipelineID pipelineID = 2; // cmdId will be removed required int64 cmdId = 3; + // Force will be used when closing a container out side of ratis. + optional bool force = 4 [default = false]; } /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java index a6fad88082f..511feac1916 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -24,7 +25,9 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; -import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.statemachine + .DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; @@ -44,6 +47,7 @@ import org.mockito.Mockito; import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.Random; import java.util.UUID; /** @@ -52,8 +56,156 @@ import java.util.UUID; public class TestCloseContainerCommandHandler { private final StateContext context = Mockito.mock(StateContext.class); + private final Random random = new Random(); private static File testDir; + @Test + public void testCloseContainerViaRatis() + throws Exception { + final OzoneConfiguration conf = new OzoneConfiguration(); + final DatanodeDetails datanodeDetails = randomDatanodeDetails(); + final OzoneContainer ozoneContainer = + getOzoneContainer(conf, datanodeDetails); + ozoneContainer.start(); + try { + final Container container = + createContainer(conf, datanodeDetails, ozoneContainer); + final long containerId = container.getContainerData().getContainerID(); + final PipelineID pipelineId = PipelineID.valueOf(UUID.fromString( + container.getContainerData().getOriginPipelineId())); + + // We have created a container via ratis. + // Now close the container on ratis. + final CloseContainerCommandHandler closeHandler = + new CloseContainerCommandHandler(); + final CloseContainerCommand command = new CloseContainerCommand( + containerId, pipelineId); + + closeHandler.handle(command, ozoneContainer, context, null); + + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, + ozoneContainer.getContainerSet().getContainer(containerId) + .getContainerState()); + + Mockito.verify(context.getParent(), + Mockito.times(2)).triggerHeartbeat(); + } finally { + ozoneContainer.stop(); + } + } + + @Test + public void testCloseContainerViaStandalone() + throws Exception { + final OzoneConfiguration conf = new OzoneConfiguration(); + final DatanodeDetails datanodeDetails = randomDatanodeDetails(); + final OzoneContainer ozoneContainer = + getOzoneContainer(conf, datanodeDetails); + ozoneContainer.start(); + try { + final Container container = + createContainer(conf, datanodeDetails, ozoneContainer); + final long containerId = container.getContainerData().getContainerID(); + // To quasi close specify a pipeline which doesn't exist in the datanode. + final PipelineID pipelineId = PipelineID.randomId(); + + // We have created a container via ratis. Now quasi close it. + final CloseContainerCommandHandler closeHandler = + new CloseContainerCommandHandler(); + final CloseContainerCommand command = new CloseContainerCommand( + containerId, pipelineId); + + closeHandler.handle(command, ozoneContainer, context, null); + + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.QUASI_CLOSED, + ozoneContainer.getContainerSet().getContainer(containerId) + .getContainerState()); + + Mockito.verify(context.getParent(), + Mockito.times(2)).triggerHeartbeat(); + } finally { + ozoneContainer.stop(); + } + } + + @Test + public void testQuasiCloseToClose() throws Exception { + final OzoneConfiguration conf = new OzoneConfiguration(); + final DatanodeDetails datanodeDetails = randomDatanodeDetails(); + final OzoneContainer ozoneContainer = + getOzoneContainer(conf, datanodeDetails); + ozoneContainer.start(); + try { + final Container container = + createContainer(conf, datanodeDetails, ozoneContainer); + final long containerId = container.getContainerData().getContainerID(); + // A pipeline which doesn't exist in the datanode. + final PipelineID pipelineId = PipelineID.randomId(); + + // We have created a container via ratis. Now quasi close it. + final CloseContainerCommandHandler closeHandler = + new CloseContainerCommandHandler(); + final CloseContainerCommand command = new CloseContainerCommand( + containerId, pipelineId); + + closeHandler.handle(command, ozoneContainer, context, null); + + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.QUASI_CLOSED, + ozoneContainer.getContainerSet().getContainer(containerId) + .getContainerState()); + + Mockito.verify(context.getParent(), + Mockito.times(2)).triggerHeartbeat(); + + // The container is quasi closed. Force close the container now. + final CloseContainerCommand closeCommand = new CloseContainerCommand( + containerId, pipelineId, true); + + closeHandler.handle(closeCommand, ozoneContainer, context, null); + + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, + ozoneContainer.getContainerSet().getContainer(containerId) + .getContainerState()); + + Mockito.verify(context.getParent(), + Mockito.times(3)).triggerHeartbeat(); + } finally { + ozoneContainer.stop(); + } + } + + @Test + public void testForceCloseOpenContainer() throws Exception { + final OzoneConfiguration conf = new OzoneConfiguration(); + final DatanodeDetails datanodeDetails = randomDatanodeDetails(); + final OzoneContainer ozoneContainer = + getOzoneContainer(conf, datanodeDetails); + ozoneContainer.start(); + try { + final Container container = + createContainer(conf, datanodeDetails, ozoneContainer); + final long containerId = container.getContainerData().getContainerID(); + // A pipeline which doesn't exist in the datanode. + final PipelineID pipelineId = PipelineID.randomId(); + + final CloseContainerCommandHandler closeHandler = + new CloseContainerCommandHandler(); + + final CloseContainerCommand closeCommand = new CloseContainerCommand( + containerId, pipelineId, true); + + closeHandler.handle(closeCommand, ozoneContainer, context, null); + + Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, + ozoneContainer.getContainerSet().getContainer(containerId) + .getContainerState()); + + Mockito.verify(context.getParent(), + Mockito.times(2)).triggerHeartbeat(); + } finally { + ozoneContainer.stop(); + } + } private OzoneContainer getOzoneContainer(final OzoneConfiguration conf, final DatanodeDetails datanodeDetails) throws IOException { @@ -67,19 +219,15 @@ public class TestCloseContainerCommandHandler { Mockito.when(datanodeStateMachine.getDatanodeDetails()) .thenReturn(datanodeDetails); Mockito.when(context.getParent()).thenReturn(datanodeStateMachine); - return new OzoneContainer(datanodeDetails, conf, context); + final OzoneContainer ozoneContainer = new OzoneContainer( + datanodeDetails, conf, context); + ozoneContainer.getDispatcher().setScmId(UUID.randomUUID().toString()); + return ozoneContainer; } - - @Test - public void testCloseContainerViaRatis() - throws IOException, InterruptedException { - final OzoneConfiguration conf = new OzoneConfiguration(); - final DatanodeDetails datanodeDetails = randomDatanodeDetails(); - final OzoneContainer container = getOzoneContainer(conf, datanodeDetails); - container.getDispatcher().setScmId(UUID.randomUUID().toString()); - container.start(); - // Give some time for ratis for leader election. + private Container createContainer(final Configuration conf, + final DatanodeDetails datanodeDetails, + final OzoneContainer ozoneContainer) throws Exception { final PipelineID pipelineID = PipelineID.randomId(); final RaftGroupId raftGroupId = RaftGroupId.valueOf(pipelineID.getId()); final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf); @@ -88,9 +236,10 @@ public class TestCloseContainerCommandHandler { Collections.singleton(datanodeDetails)); final RaftClient client = RatisHelper.newRaftClient( SupportedRpcType.GRPC, peer, retryPolicy); - System.out.println(client.groupAdd(group, peer.getId()).isSuccess()); + Assert.assertTrue(client.groupAdd(group, peer.getId()).isSuccess()); Thread.sleep(2000); - final ContainerID containerId = ContainerID.valueof(1); + final ContainerID containerId = ContainerID.valueof( + random.nextLong() & Long.MAX_VALUE); ContainerProtos.ContainerCommandRequestProto.Builder request = ContainerProtos.ContainerCommandRequestProto.newBuilder(); request.setCmdType(ContainerProtos.Type.CreateContainer); @@ -99,79 +248,14 @@ public class TestCloseContainerCommandHandler { ContainerProtos.CreateContainerRequestProto.getDefaultInstance()); request.setTraceID(UUID.randomUUID().toString()); request.setDatanodeUuid(datanodeDetails.getUuidString()); - container.getWriteChannel().submitRequest( + ozoneContainer.getWriteChannel().submitRequest( request.build(), pipelineID.getProtobuf()); + final Container container = ozoneContainer.getContainerSet().getContainer( + containerId.getId()); Assert.assertEquals(ContainerProtos.ContainerDataProto.State.OPEN, - container.getContainerSet().getContainer( - containerId.getId()).getContainerState()); - - // We have created a container via ratis. Now close the container on ratis. - final CloseContainerCommandHandler closeHandler = - new CloseContainerCommandHandler(); - final CloseContainerCommand command = new CloseContainerCommand( - containerId.getId(), pipelineID); - - closeHandler.handle(command, container, context, null); - - Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, - container.getContainerSet().getContainer( - containerId.getId()).getContainerState()); - - Mockito.verify(context.getParent(), Mockito.times(2)).triggerHeartbeat(); - container.stop(); - } - - @Test - public void testCloseContainerViaStandalone() - throws IOException, InterruptedException { - final OzoneConfiguration conf = new OzoneConfiguration(); - final DatanodeDetails datanodeDetails = randomDatanodeDetails(); - final OzoneContainer container = getOzoneContainer(conf, datanodeDetails); - container.getDispatcher().setScmId(UUID.randomUUID().toString()); - container.start(); - // Give some time for ratis for leader election. - final PipelineID pipelineID = PipelineID.randomId(); - final RaftGroupId raftGroupId = RaftGroupId.valueOf(pipelineID.getId()); - final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf); - final RaftPeer peer = RatisHelper.toRaftPeer(datanodeDetails); - final RaftGroup group = RatisHelper.newRaftGroup(raftGroupId, - Collections.singleton(datanodeDetails)); - final RaftClient client = RatisHelper.newRaftClient( - SupportedRpcType.GRPC, peer, retryPolicy); - System.out.println(client.groupAdd(group, peer.getId()).isSuccess()); - Thread.sleep(2000); - final ContainerID containerId = ContainerID.valueof(2); - ContainerProtos.ContainerCommandRequestProto.Builder request = - ContainerProtos.ContainerCommandRequestProto.newBuilder(); - request.setCmdType(ContainerProtos.Type.CreateContainer); - request.setContainerID(containerId.getId()); - request.setCreateContainer( - ContainerProtos.CreateContainerRequestProto.getDefaultInstance()); - request.setTraceID(UUID.randomUUID().toString()); - request.setDatanodeUuid(datanodeDetails.getUuidString()); - container.getWriteChannel().submitRequest( - request.build(), pipelineID.getProtobuf()); - - Assert.assertEquals(ContainerProtos.ContainerDataProto.State.OPEN, - container.getContainerSet().getContainer( - containerId.getId()).getContainerState()); - - // We have created a container via ratis. Now quasi close it - final CloseContainerCommandHandler closeHandler = - new CloseContainerCommandHandler(); - // Specify a pipeline which doesn't exist in the datanode. - final CloseContainerCommand command = new CloseContainerCommand( - containerId.getId(), PipelineID.randomId()); - - closeHandler.handle(command, container, context, null); - - Assert.assertEquals(ContainerProtos.ContainerDataProto.State.QUASI_CLOSED, - container.getContainerSet().getContainer( - containerId.getId()).getContainerState()); - - Mockito.verify(context.getParent(), Mockito.times(2)).triggerHeartbeat(); - container.stop(); + container.getContainerState()); + return container; } /**