From ddc0a405074c1e850193ca1d83556f622aa04435 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Fri, 4 Jan 2019 09:04:10 -0800 Subject: [PATCH] HDDS-896. Handle over replicated containers in SCM. Contributed by Nandakumar. --- .../container/common/interfaces/Handler.java | 9 + .../statemachine/DatanodeStateMachine.java | 3 + .../common/statemachine/StateContext.java | 34 ++-- .../CloseContainerCommandHandler.java | 102 +++++------ .../DeleteContainerCommandHandler.java | 84 +++++++++ .../endpoint/HeartbeatEndpointTask.java | 11 ++ .../container/keyvalue/KeyValueHandler.java | 62 ++++--- .../ozoneimpl/ContainerController.java | 12 +- .../commands/CloseContainerCommand.java | 13 +- .../commands/DeleteBlocksCommand.java | 6 +- .../commands/DeleteContainerCommand.java | 62 +++++++ .../commands/ReplicateContainerCommand.java | 4 - .../protocol/commands/ReregisterCommand.java | 11 +- .../ozone/protocol/commands/SCMCommand.java | 2 +- .../TestCloseContainerCommandHandler.java | 18 +- .../command/CommandStatusReportHandler.java | 12 ++ .../DeleteContainerCommandWatcher.java | 56 ++++++ .../scm/container/ReportHandlerHelper.java | 5 +- .../replication/ReplicationManager.java | 168 +++++++++++++++--- .../hadoop/hdds/scm/events/SCMEvents.java | 14 ++ .../replication/TestReplicationManager.java | 95 +++++++--- .../ozoneimpl/TestOzoneContainer.java | 5 +- 22 files changed, 603 insertions(+), 185 deletions(-) create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteContainerCommand.java create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/DeleteContainerCommandWatcher.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index 9f520d535b9..972eb8316f2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -146,6 +146,15 @@ public abstract void quasiCloseContainer(Container container) public abstract void closeContainer(Container container) throws IOException; + /** + * Deletes the given container. + * + * @param container container to be deleted + * @throws IOException + */ + public abstract void deleteContainer(Container container) + throws IOException; + public void setScmID(String scmId) { this.scmID = scmId; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index ab860d6793d..7f5233fb0ab 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -40,6 +40,8 @@ .CommandDispatcher; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler .DeleteBlocksCommandHandler; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler + .DeleteContainerCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler .ReplicateContainerCommandHandler; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; @@ -115,6 +117,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, .addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(), conf)) .addHandler(new ReplicateContainerCommandHandler(conf, supervisor)) + .addHandler(new DeleteContainerCommandHandler()) .setConnectionManager(connectionManager) .setContainer(container) .setContext(context) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 195e51b9c51..4a979fd5b77 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -19,10 +19,9 @@ import com.google.common.base.Preconditions; import com.google.protobuf.GeneratedMessage; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineAction; import org.apache.hadoop.hdds.protocol.proto @@ -427,22 +426,27 @@ public void addCmdStatus(Long key, CommandStatus status) { * @param cmd - {@link SCMCommand}. */ public void addCmdStatus(SCMCommand cmd) { - if (cmd.getType().equals(Type.closeContainerCommand)) { - // We will be removing CommandStatus completely. - // As a first step, removed it for CloseContainerCommand. - return; + final Optional cmdStatusBuilder; + switch (cmd.getType()) { + case replicateContainerCommand: + cmdStatusBuilder = Optional.of(CommandStatusBuilder.newBuilder()); + break; + case deleteBlocksCommand: + cmdStatusBuilder = Optional.of( + DeleteBlockCommandStatusBuilder.newBuilder()); + break; + case deleteContainerCommand: + cmdStatusBuilder = Optional.of(CommandStatusBuilder.newBuilder()); + break; + default: + cmdStatusBuilder = Optional.empty(); } - CommandStatusBuilder statusBuilder; - if (cmd.getType() == Type.deleteBlocksCommand) { - statusBuilder = new DeleteBlockCommandStatusBuilder(); - } else { - statusBuilder = CommandStatusBuilder.newBuilder(); - } - this.addCmdStatus(cmd.getId(), - statusBuilder.setCmdId(cmd.getId()) + cmdStatusBuilder.ifPresent(statusBuilder -> + addCmdStatus(cmd.getId(), statusBuilder + .setCmdId(cmd.getId()) .setStatus(Status.PENDING) .setType(cmd.getType()) - .build()); + .build())); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java index b34b35203df..60a02550781 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java @@ -16,7 +16,6 @@ */ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; -import com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto @@ -31,6 +30,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; import org.apache.ratis.protocol.NotLeaderException; @@ -68,61 +68,57 @@ public CloseContainerCommandHandler() { @Override public void handle(SCMCommand command, OzoneContainer ozoneContainer, StateContext context, SCMConnectionManager connectionManager) { + LOG.debug("Processing Close Container command."); + invocationCount++; + final long startTime = Time.monotonicNow(); + final DatanodeDetails datanodeDetails = context.getParent() + .getDatanodeDetails(); + final CloseContainerCommandProto closeCommand = + ((CloseContainerCommand)command).getProto(); + final ContainerController controller = ozoneContainer.getController(); + final long containerId = closeCommand.getContainerID(); try { - LOG.debug("Processing Close Container command."); - invocationCount++; - final long startTime = Time.monotonicNow(); - final DatanodeDetails datanodeDetails = context.getParent() - .getDatanodeDetails(); - final CloseContainerCommandProto closeCommand = - CloseContainerCommandProto.parseFrom(command.getProtoBufMessage()); - final ContainerController controller = ozoneContainer.getController(); - final long containerId = closeCommand.getContainerID(); - try { - final Container container = controller.getContainer(containerId); + final Container container = controller.getContainer(containerId); - if (container == null) { - LOG.error("Container #{} does not exist in datanode. " - + "Container close failed.", containerId); - return; - } - - // Move the container to CLOSING state - controller.markContainerForClose(containerId); - - // If the container is part of open pipeline, close it via write channel - if (ozoneContainer.getWriteChannel() - .isExist(closeCommand.getPipelineID())) { - if (closeCommand.getForce()) { - LOG.warn("Cannot force close a container when the container is" + - " part of an active pipeline."); - return; - } - ContainerCommandRequestProto request = - getContainerCommandRequestProto(datanodeDetails, - closeCommand.getContainerID()); - ozoneContainer.getWriteChannel().submitRequest( - request, closeCommand.getPipelineID()); - return; - } - // If we reach here, there is no active pipeline for this container. - if (!closeCommand.getForce()) { - // QUASI_CLOSE the container. - controller.quasiCloseContainer(containerId); - } else { - // SCM told us to force close the container. - controller.closeContainer(containerId); - } - } catch (NotLeaderException e) { - LOG.debug("Follower cannot close container #{}.", containerId); - } catch (IOException e) { - LOG.error("Can't close container #{}", containerId, e); - } finally { - long endTime = Time.monotonicNow(); - totalTime += endTime - startTime; + if (container == null) { + LOG.error("Container #{} does not exist in datanode. " + + "Container close failed.", containerId); + return; } - } catch (InvalidProtocolBufferException ex) { - LOG.error("Exception while closing container", ex); + + // Move the container to CLOSING state + controller.markContainerForClose(containerId); + + // If the container is part of open pipeline, close it via write channel + if (ozoneContainer.getWriteChannel() + .isExist(closeCommand.getPipelineID())) { + if (closeCommand.getForce()) { + LOG.warn("Cannot force close a container when the container is" + + " part of an active pipeline."); + return; + } + ContainerCommandRequestProto request = + getContainerCommandRequestProto(datanodeDetails, + closeCommand.getContainerID()); + ozoneContainer.getWriteChannel().submitRequest( + request, closeCommand.getPipelineID()); + return; + } + // If we reach here, there is no active pipeline for this container. + if (!closeCommand.getForce()) { + // QUASI_CLOSE the container. + controller.quasiCloseContainer(containerId); + } else { + // SCM told us to force close the container. + controller.closeContainer(containerId); + } + } catch (NotLeaderException e) { + LOG.debug("Follower cannot close container #{}.", containerId); + } catch (IOException e) { + LOG.error("Can't close container #{}", containerId, e); + } finally { + long endTime = Time.monotonicNow(); + totalTime += endTime - startTime; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java new file mode 100644 index 00000000000..2842b1a65a2 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.ozone.container.common.statemachine + .SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Handler to process the DeleteContainerCommand from SCM. + */ +public class DeleteContainerCommandHandler implements CommandHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(DeleteContainerCommandHandler.class); + + private int invocationCount; + private long totalTime; + + @Override + public void handle(final SCMCommand command, + final OzoneContainer ozoneContainer, + final StateContext context, + final SCMConnectionManager connectionManager) { + final long startTime = Time.monotonicNow(); + invocationCount++; + try { + final DeleteContainerCommand deleteContainerCommand = + (DeleteContainerCommand) command; + final ContainerController controller = ozoneContainer.getController(); + controller.deleteContainer(deleteContainerCommand.getContainerID()); + updateCommandStatus(context, command, + (cmdStatus) -> cmdStatus.setStatus(true), LOG); + } catch (IOException e) { + updateCommandStatus(context, command, + (cmdStatus) -> cmdStatus.setStatus(false), LOG); + LOG.error("Exception occurred while deleting the container.", e); + } finally { + totalTime += Time.monotonicNow() - startTime; + } + + } + + @Override + public SCMCommandProto.Type getCommandType() { + return SCMCommandProto.Type.deleteContainerCommand; + } + + @Override + public int getInvocationCount() { + return this.invocationCount; + } + + @Override + public long getAverageRunTime() { + return invocationCount == 0 ? 0 : totalTime / invocationCount; + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 0c0f1af8e02..513043f6fdb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; +import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.slf4j.Logger; @@ -294,6 +295,16 @@ private void processResponse(SCMHeartbeatResponseProto response, } this.context.addCommand(replicateContainerCommand); break; + case deleteContainerCommand: + DeleteContainerCommand deleteContainerCommand = + DeleteContainerCommand.getFromProtobuf( + commandResponseProto.getDeleteContainerCommandProto()); + if (LOG.isDebugEnabled()) { + LOG.debug("Received SCM delete container request for container {}", + deleteContainerCommand.getContainerID()); + } + this.context.addCommand(deleteContainerCommand); + break; default: throw new IllegalArgumentException("Unknown response : " + commandResponseProto.getCommandType().name()); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 01964badf3d..261dbc45124 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -240,6 +240,7 @@ ContainerCommandResponseProto handleCreateContainer( if (containerSet.getContainer(containerID) == null) { newContainer.create(volumeSet, volumeChoosingPolicy, scmID); containerSet.addContainer(newContainer); + sendICR(newContainer); } else { // The create container request for an already existing container can @@ -335,37 +336,10 @@ ContainerCommandResponseProto handleDeleteContainer( } boolean forceDelete = request.getDeleteContainer().getForceDelete(); - kvContainer.writeLock(); try { - // Check if container is open - if (kvContainer.getContainerData().isOpen()) { - kvContainer.writeUnlock(); - throw new StorageContainerException( - "Deletion of Open Container is not allowed.", - DELETE_ON_OPEN_CONTAINER); - } else if (!forceDelete && kvContainer.getContainerData().getKeyCount() - > 0) { - // If the container is not empty and cannot be deleted forcibly, - // then throw a SCE to stop deleting. - kvContainer.writeUnlock(); - throw new StorageContainerException( - "Container cannot be deleted because it is not empty.", - ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY); - } else { - long containerId = kvContainer.getContainerData().getContainerID(); - containerSet.removeContainer(containerId); - // Release the lock first. - // Avoid holding write locks for disk operations - kvContainer.writeUnlock(); - - kvContainer.delete(forceDelete); - } + deleteInternal(kvContainer, forceDelete); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); - } finally { - if (kvContainer.hasWriteLock()) { - kvContainer.writeUnlock(); - } } return ContainerUtils.getSuccessResponse(request); } @@ -823,6 +797,7 @@ public Container importContainer(long containerID, long maxSize, populateContainerPathFields(container, maxSize); container.importContainerData(rawContainerStream, packer); + sendICR(container); return container; } @@ -877,4 +852,35 @@ public void closeContainer(Container container) container.close(); sendICR(container); } + + @Override + public void deleteContainer(Container container) throws IOException { + deleteInternal(container, true); + } + + private void deleteInternal(Container container, boolean force) + throws StorageContainerException { + container.writeLock(); + try { + // Check if container is open + if (container.getContainerData().isOpen()) { + throw new StorageContainerException( + "Deletion of Open Container is not allowed.", + DELETE_ON_OPEN_CONTAINER); + } + if (!force && container.getContainerData().getKeyCount() > 0) { + // If the container is not empty and cannot be deleted forcibly, + // then throw a SCE to stop deleting. + throw new StorageContainerException( + "Container cannot be deleted because it is not empty.", + ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY); + } + long containerId = container.getContainerData().getContainerID(); + containerSet.removeContainer(containerId); + } finally { + container.writeUnlock(); + } + // Avoid holding write locks for disk operations + container.delete(force); + } } \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java index 1a918edd1c2..4dedd1f9004 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java @@ -94,7 +94,7 @@ public void quasiCloseContainer(final long containerId) throws IOException { } /** - * Closes a container given its id. + * Closes a container given its Id. * * @param containerId Id of the container to close * @throws IOException in case of exception @@ -113,6 +113,16 @@ public Container importContainer(final ContainerType type, originPipelineId, originNodeId, rawContainerStream, packer); } + /** + * Deletes a container given its Id. + * @param containerId Id of the container to be deleted + * @throws IOException + */ + public void deleteContainer(final long containerId) throws IOException { + final Container container = containerSet.getContainer(containerId); + getHandler(container).deleteContainer(container); + } + /** * Given a container, returns its handler instance. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java index 96c22acf511..ded0464ef4b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java @@ -54,16 +54,7 @@ public SCMCommandProto.Type getType() { return SCMCommandProto.Type.closeContainerCommand; } - /** - * Gets the protobuf message of this object. - * - * @return A protobuf message. - */ @Override - public byte[] getProtoBufMessage() { - return getProto().toByteArray(); - } - public CloseContainerCommandProto getProto() { return CloseContainerCommandProto.newBuilder() .setContainerID(getId()) @@ -84,4 +75,8 @@ public static CloseContainerCommand getFromProtobuf( public long getContainerID() { return getId(); } + + public PipelineID getPipelineID() { + return pipelineID; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java index 07feeff6c1b..03a876cee34 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java @@ -56,17 +56,13 @@ public SCMCommandProto.Type getType() { return SCMCommandProto.Type.deleteBlocksCommand; } - @Override - public byte[] getProtoBufMessage() { - return getProto().toByteArray(); - } - public static DeleteBlocksCommand getFromProtobuf( DeleteBlocksCommandProto deleteBlocksProto) { return new DeleteBlocksCommand(deleteBlocksProto .getDeletedBlocksTransactionsList(), deleteBlocksProto.getCmdId()); } + @Override public DeleteBlocksCommandProto getProto() { return DeleteBlocksCommandProto.newBuilder() .setCmdId(getId()) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteContainerCommand.java new file mode 100644 index 00000000000..8e0b172e2ac --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteContainerCommand.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.protocol.commands; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.DeleteContainerCommandProto; + +/** + * SCM command which tells the datanode to delete a container. + */ +public class DeleteContainerCommand extends + SCMCommand { + + private final long containerId; + + public DeleteContainerCommand(long containerId) { + this.containerId = containerId; + } + + @Override + public SCMCommandProto.Type getType() { + return SCMCommandProto.Type.deleteContainerCommand; + } + + @Override + public DeleteContainerCommandProto getProto() { + DeleteContainerCommandProto.Builder builder = + DeleteContainerCommandProto.newBuilder(); + builder.setCmdId(getId()) + .setContainerID(getContainerID()); + return builder.build(); + } + + public long getContainerID() { + return containerId; + } + + public static DeleteContainerCommand getFromProtobuf( + DeleteContainerCommandProto protoMessage) { + Preconditions.checkNotNull(protoMessage); + return new DeleteContainerCommand(protoMessage.getContainerID()); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java index 85302853952..e663bed794f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java @@ -63,10 +63,6 @@ public Type getType() { } @Override - public byte[] getProtoBufMessage() { - return getProto().toByteArray(); - } - public ReplicateContainerCommandProto getProto() { Builder builder = ReplicateContainerCommandProto.newBuilder() .setCmdId(getId()) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java index 09f361d7b6a..e3ea4aeeaff 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java @@ -39,16 +39,6 @@ public SCMCommandProto.Type getType() { return SCMCommandProto.Type.reregisterCommand; } - /** - * Gets the protobuf message of this object. - * - * @return A protobuf message. - */ - @Override - public byte[] getProtoBufMessage() { - return getProto().toByteArray(); - } - /** * Not implemented for ReregisterCommand. * @@ -59,6 +49,7 @@ public long getId() { return 0; } + @Override public ReregisterCommandProto getProto() { return ReregisterCommandProto .newBuilder() diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java index 5773bf1825b..3c4e05b424a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java @@ -49,7 +49,7 @@ public abstract class SCMCommand implements * Gets the protobuf message of this object. * @return A protobuf message. */ - public abstract byte[] getProtoBufMessage(); + public abstract T getProto(); /** * Gets the commandId of this object. diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java index 511feac1916..fdd7af879b4 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java @@ -70,6 +70,8 @@ public void testCloseContainerViaRatis() try { final Container container = createContainer(conf, datanodeDetails, ozoneContainer); + Mockito.verify(context.getParent(), + Mockito.times(1)).triggerHeartbeat(); final long containerId = container.getContainerData().getContainerID(); final PipelineID pipelineId = PipelineID.valueOf(UUID.fromString( container.getContainerData().getOriginPipelineId())); @@ -88,7 +90,7 @@ public void testCloseContainerViaRatis() .getContainerState()); Mockito.verify(context.getParent(), - Mockito.times(2)).triggerHeartbeat(); + Mockito.times(3)).triggerHeartbeat(); } finally { ozoneContainer.stop(); } @@ -105,6 +107,8 @@ public void testCloseContainerViaStandalone() try { final Container container = createContainer(conf, datanodeDetails, ozoneContainer); + Mockito.verify(context.getParent(), + Mockito.times(1)).triggerHeartbeat(); final long containerId = container.getContainerData().getContainerID(); // To quasi close specify a pipeline which doesn't exist in the datanode. final PipelineID pipelineId = PipelineID.randomId(); @@ -122,7 +126,7 @@ public void testCloseContainerViaStandalone() .getContainerState()); Mockito.verify(context.getParent(), - Mockito.times(2)).triggerHeartbeat(); + Mockito.times(3)).triggerHeartbeat(); } finally { ozoneContainer.stop(); } @@ -138,6 +142,8 @@ public void testQuasiCloseToClose() throws Exception { try { final Container container = createContainer(conf, datanodeDetails, ozoneContainer); + Mockito.verify(context.getParent(), + Mockito.times(1)).triggerHeartbeat(); final long containerId = container.getContainerData().getContainerID(); // A pipeline which doesn't exist in the datanode. final PipelineID pipelineId = PipelineID.randomId(); @@ -155,7 +161,7 @@ public void testQuasiCloseToClose() throws Exception { .getContainerState()); Mockito.verify(context.getParent(), - Mockito.times(2)).triggerHeartbeat(); + Mockito.times(3)).triggerHeartbeat(); // The container is quasi closed. Force close the container now. final CloseContainerCommand closeCommand = new CloseContainerCommand( @@ -168,7 +174,7 @@ public void testQuasiCloseToClose() throws Exception { .getContainerState()); Mockito.verify(context.getParent(), - Mockito.times(3)).triggerHeartbeat(); + Mockito.times(4)).triggerHeartbeat(); } finally { ozoneContainer.stop(); } @@ -184,6 +190,8 @@ public void testForceCloseOpenContainer() throws Exception { try { final Container container = createContainer(conf, datanodeDetails, ozoneContainer); + Mockito.verify(context.getParent(), + Mockito.times(1)).triggerHeartbeat(); final long containerId = container.getContainerData().getContainerID(); // A pipeline which doesn't exist in the datanode. final PipelineID pipelineId = PipelineID.randomId(); @@ -201,7 +209,7 @@ public void testForceCloseOpenContainer() throws Exception { .getContainerState()); Mockito.verify(context.getParent(), - Mockito.times(2)).triggerHeartbeat(); + Mockito.times(3)).triggerHeartbeat(); } finally { ozoneContainer.stop(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java index 53dfc5a4b47..0ef02a338b1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.CommandStatus; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .CommandStatusReportFromDatanode; @@ -57,6 +58,11 @@ public void onMessage(CommandStatusReportFromDatanode report, case replicateContainerCommand: publisher.fireEvent(SCMEvents.REPLICATION_STATUS, new ReplicationStatus(cmdStatus)); + if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) { + publisher.fireEvent(SCMEvents.REPLICATION_COMPLETE, + new ReplicationManager.ReplicationCompleted( + cmdStatus.getCmdId())); + } break; case deleteBlocksCommand: if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) { @@ -64,6 +70,12 @@ public void onMessage(CommandStatusReportFromDatanode report, new DeleteBlockStatus(cmdStatus)); } break; + case deleteContainerCommand: + if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) { + publisher.fireEvent(SCMEvents.DELETE_CONTAINER_COMMAND_COMPLETE, + new ReplicationManager.DeleteContainerCommandCompleted( + cmdStatus.getCmdId())); + } default: LOGGER.debug("CommandStatus of type:{} not handled in " + "CommandStatusReportHandler.", cmdStatus.getType()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/DeleteContainerCommandWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/DeleteContainerCommandWatcher.java new file mode 100644 index 00000000000..0b1e4c82332 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/DeleteContainerCommandWatcher.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container; + +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager + .DeletionRequestToRepeat; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager + .DeleteContainerCommandCompleted; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.server.events.Event; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.server.events.EventWatcher; +import org.apache.hadoop.ozone.lease.LeaseManager; + +/** + * Command watcher to track the delete container commands. + */ +public class DeleteContainerCommandWatcher extends + EventWatcher { + + public DeleteContainerCommandWatcher( + Event startEvent, + Event completionEvent, + LeaseManager leaseManager) { + super(startEvent, completionEvent, leaseManager); + } + + @Override + protected void onTimeout(EventPublisher publisher, + DeletionRequestToRepeat payload) { + //put back to the original queue + publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER, + payload.getRequest()); + } + + + @Override + protected void onFinished(EventPublisher publisher, + DeletionRequestToRepeat payload) { + + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java index d9c30909f1f..c566ca93670 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java @@ -204,10 +204,7 @@ private static void reconcileContainerState(final ContainerManager manager, .distinct() .count(); - float quasiClosePercent = ((float) uniqueQuasiClosedReplicaCount) / - ((float) replicationFactor); - - if (quasiClosePercent > 0.5F) { + if (uniqueQuasiClosedReplicaCount > (replicationFactor / 2)) { // Quorum of unique replica has been QUASI_CLOSED long sequenceId = forceCloseContainerReplicaWithHighestSequenceId( container, quasiClosedReplicas, publisher); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index e700ecde9e1..d65e45f90d3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -18,18 +18,24 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ThreadFactory; import java.util.stream.Collectors; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.DeleteContainerCommandWatcher; import org.apache.hadoop.hdds.scm.container.placement.algorithms .ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.events.SCMEvents; @@ -38,11 +44,14 @@ import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload; import org.apache.hadoop.ozone.lease.LeaseManager; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import static org.apache.hadoop.hdds.scm.events.SCMEvents + .TRACK_DELETE_CONTAINER_COMMAND; import static org.apache.hadoop.hdds.scm.events.SCMEvents .TRACK_REPLICATE_COMMAND; import org.slf4j.Logger; @@ -63,6 +72,7 @@ public class ReplicationManager implements Runnable { private EventPublisher eventPublisher; private ReplicationCommandWatcher replicationCommandWatcher; + private DeleteContainerCommandWatcher deleteContainerCommandWatcher; private boolean running = true; @@ -80,6 +90,11 @@ public ReplicationManager(ContainerPlacementPolicy containerPlacement, new ReplicationCommandWatcher(TRACK_REPLICATE_COMMAND, SCMEvents.REPLICATION_COMPLETE, commandWatcherLeaseManager); + this.deleteContainerCommandWatcher = + new DeleteContainerCommandWatcher(TRACK_DELETE_CONTAINER_COMMAND, + SCMEvents.DELETE_CONTAINER_COMMAND_COMPLETE, + commandWatcherLeaseManager); + this.replicationQueue = new ReplicationQueue(); eventQueue.addHandler(SCMEvents.REPLICATE_CONTAINER, @@ -108,15 +123,15 @@ public void run() { request = replicationQueue.take(); ContainerID containerID = new ContainerID(request.getContainerId()); - ContainerInfo containerInfo = - containerManager.getContainer(containerID); + ContainerInfo container = containerManager.getContainer(containerID); + final HddsProtos.LifeCycleState state = container.getState(); - Preconditions.checkNotNull(containerInfo, - "No information about the container " + request.getContainerId()); - - Preconditions - .checkState(containerInfo.getState() == LifeCycleState.CLOSED, - "Container should be in closed state"); + if (state != LifeCycleState.CLOSED && + state != LifeCycleState.QUASI_CLOSED) { + LOG.warn("Cannot replicate the container {} when in {} state.", + containerID, state); + continue; + } //check the current replication List containerReplicas = @@ -130,28 +145,41 @@ public void run() { return; } - ReplicationRequest finalRequest = request; + final ReplicationRequest finalRequest = request; int inFlightReplications = replicationCommandWatcher.getTimeoutEvents( - e -> e.request.getContainerId() == finalRequest.getContainerId()) + e -> e.getRequest().getContainerId() + == finalRequest.getContainerId()) + .size(); + + int inFlightDelete = deleteContainerCommandWatcher.getTimeoutEvents( + e -> e.getRequest().getContainerId() + == finalRequest.getContainerId()) .size(); int deficit = - request.getExpecReplicationCount() - containerReplicas.size() - - inFlightReplications; + (request.getExpecReplicationCount() - containerReplicas.size()) + - (inFlightReplications - inFlightDelete); if (deficit > 0) { List datanodes = containerReplicas.stream() + .sorted((r1, r2) -> + r2.getSequenceId().compareTo(r1.getSequenceId())) .map(ContainerReplica::getDatanodeDetails) .collect(Collectors.toList()); List selectedDatanodes = containerPlacement - .chooseDatanodes(datanodes, deficit, - containerInfo.getUsedBytes()); + .chooseDatanodes(datanodes, deficit, container.getUsedBytes()); //send the command for (DatanodeDetails datanode : selectedDatanodes) { + LOG.info("Container {} is under replicated." + + " Expected replica count is {}, but found {}." + + " Re-replicating it on {}.", + container.containerID(), request.getExpecReplicationCount(), + containerReplicas.size(), datanode); + ReplicateContainerCommand replicateCommand = new ReplicateContainerCommand(containerID.getId(), datanodes); @@ -168,8 +196,62 @@ public void run() { } } else if (deficit < 0) { - //TODO: too many replicas. Not handled yet. - LOG.debug("Too many replicas is not handled yet."); + + int numberOfReplicasToDelete = Math.abs(deficit); + + final Map> originIdToDnMap = + new LinkedHashMap<>(); + + containerReplicas.stream() + .sorted(Comparator.comparing(ContainerReplica::getSequenceId)) + .forEach(replica -> { + originIdToDnMap.computeIfAbsent( + replica.getOriginDatanodeId(), key -> new ArrayList<>()); + originIdToDnMap.get(replica.getOriginDatanodeId()) + .add(replica.getDatanodeDetails()); + }); + + for(UUID originId : originIdToDnMap.keySet()) { + final List listOfReplica = + originIdToDnMap.get(originId); + if (listOfReplica.size() > 1) { + final int toDelete = Math.min(listOfReplica.size() - 1, + numberOfReplicasToDelete); + final DeleteContainerCommand deleteContainer = + new DeleteContainerCommand(containerID.getId()); + for (int i = 0; i < toDelete; i++) { + LOG.info("Container {} is over replicated." + + " Expected replica count is {}, but found {}." + + " Deleting the replica on {}.", + container.containerID(), request.getExpecReplicationCount(), + containerReplicas.size(), listOfReplica.get(i)); + eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, + new CommandForDatanode<>(listOfReplica.get(i).getUuid(), + deleteContainer)); + DeletionRequestToRepeat timeoutEvent = + new DeletionRequestToRepeat(deleteContainer.getId(), + request); + + eventPublisher.fireEvent( + TRACK_DELETE_CONTAINER_COMMAND, timeoutEvent); + } + numberOfReplicasToDelete -= toDelete; + } + if (numberOfReplicasToDelete == 0) { + break; + } + } + + if (numberOfReplicasToDelete != 0) { + final int expectedReplicaCount = container + .getReplicationFactor().getNumber(); + + LOG.warn("Not able to delete the container replica of Container" + + " {} even though it is over replicated. Expected replica" + + " count is {}, current replica count is {}.", + containerID, expectedReplicaCount, + expectedReplicaCount + numberOfReplicasToDelete); + } } } catch (Exception e) { @@ -196,17 +278,43 @@ public void stop() { } /** - * Event for the ReplicationCommandWatcher to repeate the embedded request. + * Event for the ReplicationCommandWatcher to repeat the embedded request. * in case fof timeout. */ public static class ReplicationRequestToRepeat + extends ContainerRequestToRepeat { + + public ReplicationRequestToRepeat( + long commandId, ReplicationRequest request) { + super(commandId, request); + } + } + + /** + * Event for the DeleteContainerCommandWatcher to repeat the + * embedded request. In case fof timeout. + */ + public static class DeletionRequestToRepeat + extends ContainerRequestToRepeat { + + public DeletionRequestToRepeat( + long commandId, ReplicationRequest request) { + super(commandId, request); + } + } + + /** + * Container Request wrapper which will be used by ReplicationManager to + * perform the intended operation. + */ + public static class ContainerRequestToRepeat implements IdentifiableEventPayload { private final long commandId; private final ReplicationRequest request; - public ReplicationRequestToRepeat(long commandId, + ContainerRequestToRepeat(long commandId, ReplicationRequest request) { this.commandId = commandId; this.request = request; @@ -229,7 +337,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - ReplicationRequestToRepeat that = (ReplicationRequestToRepeat) o; + ContainerRequestToRepeat that = (ContainerRequestToRepeat) o; return Objects.equals(request, that.request); } @@ -241,7 +349,7 @@ public int hashCode() { } /** - * Add javadoc. + * Event which indicates that the replicate operation is completed. */ public static class ReplicationCompleted implements IdentifiableEventPayload { @@ -257,4 +365,22 @@ public long getId() { return uuid; } } + + /** + * Event which indicates that the container deletion operation is completed. + */ + public static class DeleteContainerCommandCompleted + implements IdentifiableEventPayload { + + private final long uuid; + + public DeleteContainerCommandCompleted(long uuid) { + this.uuid = uuid; + } + + @Override + public long getId() { + return uuid; + } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 72d416b2bf7..51e13062ceb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -40,6 +40,8 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .NodeReportFromDatanode; import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager + .DeleteContainerCommandCompleted; import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager .ReplicationCompleted; import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; @@ -204,6 +206,14 @@ public final class SCMEvents { public static final TypedEvent TRACK_REPLICATE_COMMAND = new TypedEvent<>(ReplicationManager.ReplicationRequestToRepeat.class); + + /** + * This event is sent by the ReplicaManager to the + * DeleteContainerCommandWatcher to track the in-progress delete commands. + */ + public static final TypedEvent + TRACK_DELETE_CONTAINER_COMMAND = + new TypedEvent<>(ReplicationManager.DeletionRequestToRepeat.class); /** * This event comes from the Heartbeat dispatcher (in fact from the * datanode) to notify the scm that the replication is done. This is @@ -216,6 +226,10 @@ public final class SCMEvents { public static final TypedEvent REPLICATION_COMPLETE = new TypedEvent<>(ReplicationCompleted.class); + public static final TypedEvent + DELETE_CONTAINER_COMMAND_COMPLETE = + new TypedEvent<>(DeleteContainerCommandCompleted.class); + /** * Signal for all the components (but especially for the replication * manager and container report handler) that the replication could be diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index 0be279d3a5b..fbe26411492 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -21,13 +21,10 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; -import java.util.Objects; import java.util.stream.IntStream; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.protocol.proto @@ -35,21 +32,24 @@ import org.apache.hadoop.hdds.scm.TestUtils; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.container.ContainerReplica; import org.apache.hadoop.hdds.scm.container.ContainerInfo; -import org.apache.hadoop.hdds.scm.pipeline.Pipeline; -import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.container.placement.algorithms .ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.container.replication .ReplicationManager.ReplicationRequestToRepeat; +import org.apache.hadoop.hdds.scm.container.replication + .ReplicationManager.DeletionRequestToRepeat; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.lease.LeaseManager; import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; -import com.google.common.base.Preconditions; -import static org.apache.hadoop.hdds.scm.events.SCMEvents.TRACK_REPLICATE_COMMAND; +import static org.apache.hadoop.hdds.scm.events.SCMEvents + .TRACK_DELETE_CONTAINER_COMMAND; +import static org.apache.hadoop.hdds.scm.events.SCMEvents + .TRACK_REPLICATE_COMMAND; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -65,6 +65,7 @@ public class TestReplicationManager { private EventQueue queue; private List trackReplicationEvents; + private List trackDeleteEvents; private List> copyEvents; @@ -87,6 +88,8 @@ public void initReplicationManager() throws IOException { listOfContainerReplica.add(ContainerReplica.newBuilder() .setContainerID(ContainerID.valueof(i)) .setContainerState(ContainerReplicaProto.State.CLOSED) + .setSequenceId(10000L) + .setOriginNodeId(dd.getUuid()) .setDatanodeDetails(dd).build()); }); @@ -119,6 +122,10 @@ public void initReplicationManager() throws IOException { queue.addHandler(TRACK_REPLICATE_COMMAND, (event, publisher) -> trackReplicationEvents.add(event)); + trackDeleteEvents = new ArrayList<>(); + queue.addHandler(TRACK_DELETE_CONTAINER_COMMAND, + (event, publisher) -> trackDeleteEvents.add(event)); + copyEvents = new ArrayList<>(); queue.addHandler(SCMEvents.DATANODE_COMMAND, (event, publisher) -> copyEvents.add(event)); @@ -128,8 +135,6 @@ public void initReplicationManager() throws IOException { replicationManager = new ReplicationManager(containerPlacementPolicy, containerManager, queue, leaseManager); - - } /** @@ -160,6 +165,57 @@ public void testNoExistingReplicas() throws InterruptedException { } } + @Test + public void testOverReplication() throws ContainerNotFoundException, + InterruptedException { + try { + leaseManager.start(); + replicationManager.start(); + + final ContainerID containerID = ContainerID.valueof(5L); + + final ContainerReplica duplicateReplicaOne = ContainerReplica.newBuilder() + .setContainerID(containerID) + .setContainerState(ContainerReplicaProto.State.CLOSED) + .setSequenceId(10000L) + .setOriginNodeId(listOfDatanodeDetails.get(0).getUuid()) + .setDatanodeDetails(listOfDatanodeDetails.get(3)) + .build(); + + final ContainerReplica duplicateReplicaTwo = ContainerReplica.newBuilder() + .setContainerID(containerID) + .setContainerState(ContainerReplicaProto.State.CLOSED) + .setSequenceId(10000L) + .setOriginNodeId(listOfDatanodeDetails.get(1).getUuid()) + .setDatanodeDetails(listOfDatanodeDetails.get(4)) + .build(); + + when(containerManager.getContainerReplicas(new ContainerID(5L))) + .thenReturn(new HashSet<>(Arrays.asList( + listOfContainerReplica.get(0), + listOfContainerReplica.get(1), + listOfContainerReplica.get(2), + duplicateReplicaOne, + duplicateReplicaTwo + ))); + + queue.fireEvent(SCMEvents.REPLICATE_CONTAINER, + new ReplicationRequest(5L, (short) 5, System.currentTimeMillis(), + (short) 3)); + Thread.sleep(500L); + queue.processAll(1000L); + + //THEN + Assert.assertEquals(2, trackDeleteEvents.size()); + Assert.assertEquals(2, copyEvents.size()); + + } finally { + if (leaseManager != null) { + leaseManager.shutdown(); + } + } + } + @Test public void testEventSending() throws InterruptedException, IOException { @@ -196,6 +252,7 @@ public void testCommandWatcher() throws InterruptedException, IOException { containerManager, queue, rapidLeaseManager); try { + leaseManager.start(); rapidLeaseManager.start(); replicationManager.start(); @@ -223,25 +280,11 @@ public void testCommandWatcher() throws InterruptedException, IOException { Assert.assertEquals(2, copyEvents.size()); } finally { - if (rapidLeaseManager != null) { - rapidLeaseManager.shutdown(); + rapidLeaseManager.shutdown(); + if (leaseManager != null) { + leaseManager.shutdown(); } } } - public static Pipeline createPipeline(Iterable ids) - throws IOException { - Objects.requireNonNull(ids, "ids == null"); - Preconditions.checkArgument(ids.iterator().hasNext()); - List dns = new ArrayList<>(); - ids.forEach(dns::add); - return Pipeline.newBuilder() - .setState(Pipeline.PipelineState.OPEN) - .setId(PipelineID.randomId()) - .setType(HddsProtos.ReplicationType.STAND_ALONE) - .setFactor(ReplicationFactor.ONE) - .setNodes(dns) - .build(); - } - } \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java index cd0b0609ac9..d58466f8870 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java @@ -75,13 +75,12 @@ public void testCreateOzoneContainer() throws Exception { conf.setBoolean( OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false); - DatanodeDetails datanodeDetails = Mockito.mock(DatanodeDetails.class); + DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); StateContext context = Mockito.mock(StateContext.class); DatanodeStateMachine dsm = Mockito.mock(DatanodeStateMachine.class); Mockito.when(dsm.getDatanodeDetails()).thenReturn(datanodeDetails); Mockito.when(context.getParent()).thenReturn(dsm); - container = new OzoneContainer(TestUtils.randomDatanodeDetails(), - conf, context); + container = new OzoneContainer(datanodeDetails, conf, context); //Setting scmId, as we start manually ozone container. container.getDispatcher().setScmId(UUID.randomUUID().toString()); container.start();