From eef437d5e26843efd14327d2cf2dcbcef4aa408b Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Mon, 28 Aug 2017 10:04:46 +0800 Subject: [PATCH] HDFS-12282. Ozone: DeleteKey-4: Block delete via HB between SCM and DN. Contributed by Weiwei Yang. --- .../DeletedContainerBlocksSummary.java | 103 +++++++++ .../statemachine/DatanodeStateMachine.java | 14 +- .../background/BlockDeletingService.java | 30 ++- .../DeleteBlocksCommandHandler.java | 198 ++++++++++++++++ .../endpoint/HeartbeatEndpointTask.java | 15 ++ .../StorageContainerDatanodeProtocol.java | 12 +- .../commands/DeleteBlocksCommand.java | 63 +++++ ...atanodeProtocolClientSideTranslatorPB.java | 14 ++ ...atanodeProtocolServerSideTranslatorPB.java | 13 ++ .../ozone/scm/StorageContainerManager.java | 66 +++++- .../hadoop/ozone/scm/block/BlockManager.java | 17 ++ .../ozone/scm/block/BlockManagerImpl.java | 43 ++++ .../ozone/scm/block/DeletedBlockLog.java | 21 +- .../ozone/scm/block/DeletedBlockLogImpl.java | 53 ++++- .../scm/block/SCMBlockDeletingService.java | 217 ++++++++++++++++++ .../hadoop/ozone/scm/node/NodeManager.java | 9 + .../hadoop/ozone/scm/node/SCMNodeManager.java | 5 + .../hadoop/utils/BackgroundTaskResult.java | 17 +- .../StorageContainerDatanodeProtocol.proto | 27 ++- .../ozone/TestStorageContainerManager.java | 116 +++++++++- .../TestStorageContainerManagerHelper.java | 180 +++++++++++++++ .../ozone/container/common/ScmTestMock.java | 9 + 22 files changed, 1216 insertions(+), 26 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java new file mode 100644 index 00000000000..de5e2d09ae7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.helpers; + +import com.google.common.collect.Maps; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * A helper class to wrap the info about under deletion container blocks. + */ +public final class DeletedContainerBlocksSummary { + + private final List blocks; + // key : txID + // value : times of this tx has been processed + private final Map txSummary; + // key : container name + // value : the number of blocks need to be deleted in this container + // if the message contains multiple entries for same block, + // blocks will be merged + private final Map blockSummary; + // total number of blocks in this message + private int numOfBlocks; + + private DeletedContainerBlocksSummary(List blocks) { + this.blocks = blocks; + txSummary = Maps.newHashMap(); + blockSummary = Maps.newHashMap(); + blocks.forEach(entry -> { + txSummary.put(entry.getTxID(), entry.getCount()); + if (blockSummary.containsKey(entry.getContainerName())) { + blockSummary.put(entry.getContainerName(), + blockSummary.get(entry.getContainerName()) + + entry.getBlockIDCount()); + } else { + blockSummary.put(entry.getContainerName(), entry.getBlockIDCount()); + } + numOfBlocks += entry.getBlockIDCount(); + }); + } + + public static DeletedContainerBlocksSummary getFrom( + List blocks) { + return new DeletedContainerBlocksSummary(blocks); + } + + public int getNumOfBlocks() { + return numOfBlocks; + } + + public int getNumOfContainers() { + return blockSummary.size(); + } + + public String getTXIDs() { + return String.join(",", txSummary.keySet() + .stream().map(String::valueOf).collect(Collectors.toList())); + } + + public String getTxIDSummary() { + List txSummaryEntry = txSummary.entrySet().stream() + .map(entry -> entry.getKey() + "(" + entry.getValue() + ")") + .collect(Collectors.toList()); + return "[" + String.join(",", txSummaryEntry) + "]"; + } + + @Override public String toString() { + StringBuffer sb = new StringBuffer(); + for (DeletedBlocksTransaction blks : blocks) { + sb.append(" ") + .append("TXID=") + .append(blks.getTxID()) + .append(", ") + .append("TimesProceed=") + .append(blks.getCount()) + .append(", ") + .append(blks.getContainerName()) + .append(" : [") + .append(String.join(",", blks.getBlockIDList())).append("]") + .append("\n"); + } + return sb.toString(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 66992af2b83..45e47db905a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -24,6 +24,7 @@ import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CommandDispatcher; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ContainerReportHandler; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; @@ -77,15 +78,16 @@ public class DatanodeStateMachine implements Closeable { this.datanodeID = datanodeID; nextHB = new AtomicLong(Time.monotonicNow()); - // When we add new handlers just adding a new handler here should do the // trick. commandDispatcher = CommandDispatcher.newBuilder() - .addHandler(new ContainerReportHandler()) - .setConnectionManager(connectionManager) - .setContainer(container) - .setContext(context) - .build(); + .addHandler(new ContainerReportHandler()) + .addHandler(new DeleteBlocksCommandHandler( + container.getContainerManager(), conf)) + .setConnectionManager(connectionManager) + .setContainer(container) + .setContext(context) + .build(); } public void setDatanodeID(DatanodeID datanodeID) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java index 498264ef0f0..48cf329528b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; @@ -116,6 +117,11 @@ public class BlockDeletingService extends BackgroundService{ LOG.warn("Failed to initiate block deleting tasks, " + "caused by unable to get containers info. " + "Retry in next interval. ", e); + } catch (Exception e) { + // In case listContainer call throws any uncaught RuntimeException. + if (LOG.isDebugEnabled()) { + LOG.debug("Unexpected error occurs during deleting blocks.", e); + } } return queue; } @@ -159,6 +165,7 @@ public class BlockDeletingService extends BackgroundService{ @Override public BackgroundTaskResult call() throws Exception { + ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult(); long startTime = Time.monotonicNow(); // Scan container's db and get list of under deletion blocks MetadataStore meta = KeyUtils.getDB(containerData, conf); @@ -175,17 +182,24 @@ public class BlockDeletingService extends BackgroundService{ List succeedBlocks = new LinkedList<>(); LOG.debug("Container : {}, To-Delete blocks : {}", containerData.getContainerName(), toDeleteBlocks.size()); + File dataDir = ContainerUtils.getDataDirectory(containerData).toFile(); + if (!dataDir.exists() || !dataDir.isDirectory()) { + LOG.error("Invalid container data dir {} : " + + "not exist or not a directory", dataDir.getAbsolutePath()); + return crr; + } + toDeleteBlocks.forEach(entry -> { String blockName = DFSUtil.bytes2String(entry.getKey()); LOG.debug("Deleting block {}", blockName); try { ContainerProtos.KeyData data = ContainerProtos.KeyData.parseFrom(entry.getValue()); - for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) { - File chunkFile = new File(chunkInfo.getChunkName()); + File chunkFile = dataDir.toPath() + .resolve(chunkInfo.getChunkName()).toFile(); if (FileUtils.deleteQuietly(chunkFile)) { - LOG.debug("block {} chunk {} deleted", blockName, + LOG.info("block {} chunk {} deleted", blockName, chunkFile.getAbsolutePath()); } } @@ -201,11 +215,11 @@ public class BlockDeletingService extends BackgroundService{ batch.delete(DFSUtil.string2Bytes(entry))); meta.writeBatch(batch); - LOG.info("The elapsed time of task@{} for" - + " deleting blocks: {}ms.", - Integer.toHexString(this.hashCode()), - Time.monotonicNow() - startTime); - ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult(); + if (!succeedBlocks.isEmpty()) { + LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms", + containerData.getContainerName(), succeedBlocks.size(), + Time.monotonicNow() - startTime); + } crr.addAll(succeedBlocks); return crr; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java new file mode 100644 index 00000000000..a833cdfe63c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.helpers.DeletedContainerBlocksSummary; +import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager; +import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.BatchOperation; +import org.apache.hadoop.utils.MetadataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** + * Handle block deletion commands. + */ +public class DeleteBlocksCommandHandler implements CommandHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(DeleteBlocksCommandHandler.class); + + private ContainerManager containerManager; + private Configuration conf; + private int invocationCount; + private long totalTime; + + public DeleteBlocksCommandHandler(ContainerManager containerManager, + Configuration conf) { + this.containerManager = containerManager; + this.conf = conf; + } + + @Override + public void handle(SCMCommand command, OzoneContainer container, + StateContext context, SCMConnectionManager connectionManager) { + if (command.getType() != Type.deleteBlocksCommand) { + LOG.warn("Skipping handling command, expected command " + + "type {} but found {}", + Type.deleteBlocksCommand, command.getType()); + return; + } + LOG.debug("Processing block deletion command."); + invocationCount++; + long startTime = Time.monotonicNow(); + + // move blocks to deleting state. + // this is a metadata update, the actual deletion happens in another + // recycling thread. + DeleteBlocksCommand cmd = (DeleteBlocksCommand) command; + List containerBlocks = cmd.blocksTobeDeleted(); + + + DeletedContainerBlocksSummary summary = + DeletedContainerBlocksSummary.getFrom(containerBlocks); + LOG.info("Start to delete container blocks, TXIDs={}, " + + "numOfContainers={}, numOfBlocks={}", + summary.getTxIDSummary(), + summary.getNumOfContainers(), + summary.getNumOfBlocks()); + + ContainerBlocksDeletionACKProto.Builder resultBuilder = + ContainerBlocksDeletionACKProto.newBuilder(); + containerBlocks.forEach(entry -> { + DeleteBlockTransactionResult.Builder txResultBuilder = + DeleteBlockTransactionResult.newBuilder(); + txResultBuilder.setTxID(entry.getTxID()); + try { + deleteContainerBlocks(entry, conf); + txResultBuilder.setSuccess(true); + } catch (IOException e) { + LOG.warn("Failed to delete blocks for container={}, TXID={}", + entry.getContainerName(), entry.getTxID(), e); + txResultBuilder.setSuccess(false); + } + resultBuilder.addResults(txResultBuilder.build()); + }); + ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build(); + + // Send ACK back to SCM as long as meta updated + // TODO Or we should wait until the blocks are actually deleted? + if (!containerBlocks.isEmpty()) { + for (EndpointStateMachine endPoint : connectionManager.getValues()) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Sending following block deletion ACK to SCM"); + for (DeleteBlockTransactionResult result : + blockDeletionACK.getResultsList()) { + LOG.debug(result.getTxID() + " : " + result.getSuccess()); + } + } + endPoint.getEndPoint() + .sendContainerBlocksDeletionACK(blockDeletionACK); + } catch (IOException e) { + LOG.error("Unable to send block deletion ACK to SCM {}", + endPoint.getAddress().toString(), e); + } + } + } + + long endTime = Time.monotonicNow(); + totalTime += endTime - startTime; + } + + /** + * Move a bunch of blocks from a container to deleting state. + * This is a meta update, the actual deletes happen in async mode. + * + * @param delTX a block deletion transaction. + * @param config configuration. + * @throws IOException if I/O error occurs. + */ + private void deleteContainerBlocks(DeletedBlocksTransaction delTX, + Configuration config) throws IOException { + String containerId = delTX.getContainerName(); + ContainerData containerInfo = containerManager.readContainer(containerId); + if (LOG.isDebugEnabled()) { + LOG.debug("Processing Container : {}, DB path : {}", containerId, + containerInfo.getDBPath()); + } + MetadataStore containerDB = KeyUtils.getDB(containerInfo, config); + for (String blk : delTX.getBlockIDList()) { + BatchOperation batch = new BatchOperation(); + byte[] blkBytes = DFSUtil.string2Bytes(blk); + byte[] blkInfo = containerDB.get(blkBytes); + if (blkInfo != null) { + // Found the block in container db, + // use an atomic update to change its state to deleting. + batch.put(DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk), + blkInfo); + batch.delete(blkBytes); + try { + containerDB.writeBatch(batch); + LOG.info("Transited Block {} to DELETING state in container {}", + blk, containerId); + } catch (IOException e) { + // if some blocks failed to delete, we fail this TX, + // without sending this ACK to SCM, SCM will resend the TX + // with a certain number of retries. + throw new IOException( + "Failed to delete blocks for TXID = " + delTX.getTxID(), e); + } + } else { + LOG.info("Block {} not found or already under deletion in" + + " container {}, skip deleting it.", blk, containerId); + } + } + } + + @Override + public Type getCommandType() { + return Type.deleteBlocksCommand; + } + + @Override + public int getInvocationCount() { + return this.invocationCount; + } + + @Override + public long getAverageRunTime() { + if (invocationCount > 0) { + return totalTime / invocationCount; + } + return 0; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 764e6d2a46d..d68351ce51c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -21,11 +21,14 @@ package org.apache.hadoop.ozone.container.common.states.endpoint; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.container.common.helpers + .DeletedContainerBlocksSummary; import org.apache.hadoop.ozone.container.common.statemachine .EndpointStateMachine; import org.apache.hadoop.ozone.container.common.statemachine .EndpointStateMachine.EndPointStates; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto; import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand; @@ -144,6 +147,18 @@ public class HeartbeatEndpointTask } } break; + case deleteBlocksCommand: + DeleteBlocksCommand db = DeleteBlocksCommand + .getFromProtobuf(commandResponseProto.getDeleteBlocksProto()); + if (!db.blocksTobeDeleted().isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug(DeletedContainerBlocksSummary + .getFrom(db.blocksTobeDeleted()) + .toString()); + } + this.context.addCommand(db); + } + break; default: throw new IllegalArgumentException("Unknown response : " + commandResponseProto.getCmdType().name()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java index 75bd771a52e..a7b87174c7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java @@ -25,7 +25,8 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; - +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto; import java.io.IOException; /** @@ -70,4 +71,13 @@ public interface StorageContainerDatanodeProtocol { */ SCMHeartbeatResponseProto sendContainerReport(ContainerReportsProto reports) throws IOException; + + /** + * Used by datanode to send block deletion ACK to SCM. + * @param request block deletion transactions. + * @return block deletion transaction response. + * @throws IOException + */ + ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( + ContainerBlocksDeletionACKProto request) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java new file mode 100644 index 00000000000..8e3463d9550 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol.commands; + +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDeleteBlocksCmdResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type; + +import java.util.List; + +/** + * A SCM command asks a datanode to delete a number of blocks. + */ +public class DeleteBlocksCommand extends + SCMCommand { + + private List blocksTobeDeleted; + + + public DeleteBlocksCommand(List blocks) { + this.blocksTobeDeleted = blocks; + } + + public List blocksTobeDeleted() { + return this.blocksTobeDeleted; + } + + @Override + public Type getType() { + return Type.deleteBlocksCommand; + } + + @Override + public byte[] getProtoBufMessage() { + return getProto().toByteArray(); + } + + public static DeleteBlocksCommand getFromProtobuf( + SCMDeleteBlocksCmdResponseProto deleteBlocksProto) { + return new DeleteBlocksCommand(deleteBlocksProto + .getDeletedBlocksTransactionsList()); + } + + public SCMDeleteBlocksCmdResponseProto getProto() { + return SCMDeleteBlocksCmdResponseProto.newBuilder() + .addAllDeletedBlocksTransactions(blocksTobeDeleted).build(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java index e0e3bee8c3f..033513dc9ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java @@ -38,6 +38,8 @@ import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto; import java.io.Closeable; import java.io.IOException; @@ -177,4 +179,16 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB return resp; } + @Override + public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( + ContainerBlocksDeletionACKProto deletedBlocks) throws IOException { + final ContainerBlocksDeletionACKResponseProto resp; + try { + resp = rpcProxy.sendContainerBlocksDeletionACK(NULL_RPC_CONTROLLER, + deletedBlocks); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + return resp; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java index 116cc382819..9808f3ef683 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java @@ -24,6 +24,8 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; import java.io.IOException; @@ -97,4 +99,15 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB throw new ServiceException(e); } } + + @Override + public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( + RpcController controller, ContainerBlocksDeletionACKProto request) + throws ServiceException { + try { + return impl.sendContainerBlocksDeletionACK(request); + } catch (IOException e) { + throw new ServiceException(e); + } + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index fa14ad0a0ef..5a97735e172 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; +import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; @@ -52,6 +53,9 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SendContainerReportProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto; import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB; @@ -91,6 +95,8 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.UUID; +import java.util.Collections; +import java.util.stream.Collectors; import static org.apache.hadoop.ozone.protocol.proto .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result; @@ -322,8 +328,8 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl * @throws InvalidProtocolBufferException */ @VisibleForTesting - public static SCMCommandResponseProto getCommandResponse(SCMCommand cmd) - throws InvalidProtocolBufferException { + public SCMCommandResponseProto getCommandResponse(SCMCommand cmd) + throws IOException { Type type = cmd.getType(); SCMCommandResponseProto.Builder builder = SCMCommandResponseProto.newBuilder(); @@ -346,6 +352,17 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl .setReregisterProto(SCMReregisterCmdResponseProto .getDefaultInstance()) .build(); + case deleteBlocksCommand: + // Once SCM sends out the deletion message, increment the count. + // this is done here instead of when SCM receives the ACK, because + // DN might not be able to response the ACK for sometime. In case + // it times out, SCM needs to re-send the message some more times. + List txs = ((DeleteBlocksCommand) cmd).blocksTobeDeleted() + .stream().map(tx -> tx.getTxID()).collect(Collectors.toList()); + this.getScmBlockManager().getDeletedBlockLog().incrementCount(txs); + return builder.setCmdType(Type.deleteBlocksCommand) + .setDeleteBlocksProto(((DeleteBlocksCommand) cmd).getProto()) + .build(); default: throw new IllegalArgumentException("Not implemented"); } @@ -591,6 +608,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl datanodeRpcAddress)); datanodeRpcServer.start(); httpServer.start(); + scmBlockManager.start(); setStartTime(); @@ -628,6 +646,13 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl LOG.error("Storage Container Manager HTTP server stop failed.", ex); } + try { + LOG.info("Stopping Block Manager Service."); + scmBlockManager.stop(); + } catch (Exception ex) { + LOG.error("SCM block manager service stop failed.", ex); + } + unregisterMXBean(); IOUtils.cleanupWithLogger(LOG, scmContainerManager); IOUtils.cleanupWithLogger(LOG, scmBlockManager); @@ -714,6 +739,38 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl .build(); } + /** + * Handles the block deletion ACKs sent by datanodes. Once ACKs recieved, + * SCM considers the blocks are deleted and update the metadata in SCM DB. + * + * @param acks + * @return + * @throws IOException + */ + @Override + public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( + ContainerBlocksDeletionACKProto acks) throws IOException { + if (acks.getResultsCount() > 0) { + List resultList = acks.getResultsList(); + for (DeleteBlockTransactionResult result : resultList) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got block deletion ACK from datanode, TXIDs={}, " + + "success={}", result.getTxID(), result.getSuccess()); + } + if (result.getSuccess()) { + LOG.info("Purging TXID={} from block deletion log", result.getTxID()); + this.getScmBlockManager().getDeletedBlockLog() + .commitTransactions(Collections.singletonList(result.getTxID())); + } else { + LOG.warn("Got failed ACK for TXID={}, prepare to resend the " + + "TX in next interval", result.getTxID()); + } + } + } + return ContainerBlocksDeletionACKResponseProto.newBuilder() + .getDefaultInstanceForType(); + } + /** * Returns the Number of Datanodes that are communicating with SCM. * @@ -742,6 +799,11 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl return scmNodeManager; } + @VisibleForTesting + public BlockManager getScmBlockManager() { + return scmBlockManager; + } + /** * Get block locations. * @param keys batch of block keys to retrieve. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java index d1487fc7382..4672b338f7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java @@ -51,4 +51,21 @@ public interface BlockManager extends Closeable { * @throws IOException */ void deleteBlock(String key) throws IOException; + + /** + * @return the block deletion transaction log maintained by SCM. + */ + DeletedBlockLog getDeletedBlockLog(); + + /** + * Start block manager background services. + * @throws IOException + */ + void start() throws IOException; + + /** + * Shutdown block manager background services. + * @throws IOException + */ + void stop() throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java index 43ca21cc58e..d920c429979 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java @@ -46,6 +46,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.List; @@ -70,6 +71,14 @@ import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes. FAILED_TO_LOAD_OPEN_CONTAINER; import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes. INVALID_BLOCK_SIZE; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; /** * Block Manager manages the block access for SCM. @@ -89,6 +98,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { // Track all containers owned by block service. private final MetadataStore containerStore; private final DeletedBlockLog deletedBlockLog; + private final SCMBlockDeletingService blockDeletingService; private Map> containers; @@ -143,7 +153,34 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { this.lock = new ReentrantLock(); mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this); + + // SCM block deleting transaction log and deleting service. deletedBlockLog = new DeletedBlockLogImpl(conf); + int svcInterval = conf.getInt( + OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, + OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT); + long serviceTimeout = conf.getTimeDuration( + OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, + OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); + blockDeletingService = new SCMBlockDeletingService(deletedBlockLog, + containerManager, nodeManager, svcInterval, serviceTimeout); + } + + /** + * Start block manager services. + * @throws IOException + */ + public void start() throws IOException { + this.blockDeletingService.start(); + } + + /** + * Shutdown block manager services. + * @throws IOException + */ + public void stop() throws IOException { + this.blockDeletingService.shutdown(); + this.close(); } // TODO: close full (or almost full) containers with a separate thread. @@ -475,6 +512,11 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { } } + @Override + public DeletedBlockLog getDeletedBlockLog() { + return this.deletedBlockLog; + } + @VisibleForTesting public String getDeletedKeyName(String key) { return StringUtils.format(".Deleted/%s", key); @@ -495,6 +537,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { if (deletedBlockLog != null) { deletedBlockLog.close(); } + blockDeletingService.shutdown(); MBeans.unregister(mxBean); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java index 60d53af45d6..9e268a63f66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.ozone.scm.block; - import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; @@ -45,6 +44,17 @@ public interface DeletedBlockLog extends Closeable { List getTransactions(int count) throws IOException; + /** + * Return all failed transactions in the log. A transaction is considered + * to be failed if it has been sent more than MAX_RETRY limit and its + * count is reset to -1. + * + * @return a list of failed deleted block transactions. + * @throws IOException + */ + List getFailedTransactions() + throws IOException; + /** * Increments count for given list of transactions by 1. * The log maintains a valid range of counts for each transaction @@ -75,4 +85,13 @@ public interface DeletedBlockLog extends Closeable { */ void addTransaction(String containerName, List blocks) throws IOException; + + /** + * Returns the total number of valid transactions. A transaction is + * considered to be valid as long as its count is in range [0, MAX_RETRY]. + * + * @return number of a valid transactions. + * @throws IOException + */ + int getNumOfValidTransactions() throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java index ef1a515c8f8..738157d221e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.scm.block; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import com.google.common.primitives.Longs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtil; @@ -36,6 +37,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -147,6 +149,28 @@ public class DeletedBlockLogImpl implements DeletedBlockLog { return result; } + @Override + public List getFailedTransactions() + throws IOException { + lock.lock(); + try { + final List failedTXs = Lists.newArrayList(); + deletedStore.iterate(null, (key, value) -> { + if (!Arrays.equals(LATEST_TXID, key)) { + DeletedBlocksTransaction delTX = + DeletedBlocksTransaction.parseFrom(value); + if (delTX.getCount() == -1) { + failedTXs.add(delTX); + } + } + return true; + }); + return failedTXs; + } finally { + lock.unlock(); + } + } + /** * {@inheritDoc} * @@ -163,13 +187,14 @@ public class DeletedBlockLogImpl implements DeletedBlockLog { DeletedBlocksTransaction block = DeletedBlocksTransaction .parseFrom(deletedStore.get(Longs.toByteArray(txID))); DeletedBlocksTransaction.Builder builder = block.toBuilder(); - if (block.getCount() > -1) { - builder.setCount(block.getCount() + 1); + int currentCount = block.getCount(); + if (currentCount > -1) { + builder.setCount(++currentCount); } // if the retry time exceeds the maxRetry value // then set the retry value to -1, stop retrying, admins can // analyze those blocks and purge them manually by SCMCli. - if (block.getCount() > maxRetry) { + if (currentCount > maxRetry) { builder.setCount(-1); } deletedStore.put(Longs.toByteArray(txID), @@ -237,6 +262,28 @@ public class DeletedBlockLogImpl implements DeletedBlockLog { } } + @Override + public int getNumOfValidTransactions() throws IOException { + lock.lock(); + try { + final AtomicInteger num = new AtomicInteger(0); + deletedStore.iterate(null, (key, value) -> { + // Exclude latest txid record + if (!Arrays.equals(LATEST_TXID, key)) { + DeletedBlocksTransaction delTX = + DeletedBlocksTransaction.parseFrom(value); + if (delTX.getCount() > -1) { + num.incrementAndGet(); + } + } + return true; + }); + return num.get(); + } finally { + lock.unlock(); + } + } + @Override public void close() throws IOException { if (deletedStore != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java new file mode 100644 index 00000000000..3ca1133e187 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.scm.block; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.ozone.scm.container.Mapping; +import org.apache.hadoop.ozone.scm.node.NodeManager; +import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.utils.BackgroundService; +import org.apache.hadoop.utils.BackgroundTask; +import org.apache.hadoop.utils.BackgroundTaskQueue; +import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * A background service running in SCM to delete blocks. This service scans + * block deletion log in certain interval and caches block deletion commands + * in {@link org.apache.hadoop.ozone.scm.node.CommandQueue}, asynchronously + * SCM HB thread polls cached commands and sends them to datanode for physical + * processing. + */ +public class SCMBlockDeletingService extends BackgroundService { + + private static final Logger LOG = + LoggerFactory.getLogger(SCMBlockDeletingService.class); + + // ThreadPoolSize=2, 1 for scheduler and the other for the scanner. + private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 2; + private final DeletedBlockLog deletedBlockLog; + private final Mapping mappingService; + private final NodeManager nodeManager; + + // Default container size is 5G and block size is 256MB, a full container + // at most contains 20 blocks. At most each TX contains 20 blocks. + // When SCM sends block deletion TXs to datanode, each command we allow + // at most 50 containers so that will limit number of to be deleted blocks + // less than 1000. + // TODO - a better throttle algorithm + // Note, this is not an accurate limit of blocks. When we scan + // the log, worst case we may get 50 TX for 50 different datanodes, + // that will cause the deletion message sent by SCM extremely small. + // As a result, the deletion will be slow. An improvement is to scan + // log multiple times until we get enough TXs for each datanode, or + // the entire log is scanned. + private static final int BLOCK_DELETE_TX_PER_REQUEST_LIMIT = 50; + + public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog, + Mapping mapper, NodeManager nodeManager, + int interval, long serviceTimeout) { + super("SCMBlockDeletingService", interval, TimeUnit.MILLISECONDS, + BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout); + this.deletedBlockLog = deletedBlockLog; + this.mappingService = mapper; + this.nodeManager = nodeManager; + } + + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue queue = new BackgroundTaskQueue(); + queue.add(new DeletedBlockTransactionScanner()); + return queue; + } + + private class DeletedBlockTransactionScanner + implements BackgroundTask { + + @Override + public int getPriority() { + return 1; + } + + @Override + public EmptyTaskResult call() throws Exception { + // Scan SCM DB in HB interval and collect a throttled list of + // to delete blocks. + LOG.info("Running DeletedBlockTransactionScanner"); + DatanodeDeletedBlockTransactions transactions = + getToDeleteContainerBlocks(); + if (transactions != null && !transactions.isEmpty()) { + transactions.getDatanodes().forEach(datanodeID -> { + List dnTXs = + transactions.getDatanodeTransactions(datanodeID); + // TODO commandQueue needs a cap. + // We should stop caching new commands if num of un-processed + // command is bigger than a limit, e.g 50. In case datanode goes + // offline for sometime, the cached commands be flooded. + nodeManager.addDatanodeCommand(datanodeID, + new DeleteBlocksCommand(dnTXs)); + LOG.info("Added delete block command for datanode {} in the queue," + + " number delete block transactions: {}, TxID list: {}", + datanodeID, dnTXs.size(), + String.join(",", transactions.getTransactionIDList(datanodeID))); + + }); + } + return EmptyTaskResult.newResult(); + } + + // Scan deleteBlocks.db to get a number of to-delete blocks. + // this is going to be properly throttled. + private DatanodeDeletedBlockTransactions getToDeleteContainerBlocks() { + DatanodeDeletedBlockTransactions dnTXs = + new DatanodeDeletedBlockTransactions(); + List txs = null; + try { + // Get a limited number of TXs to send via HB at a time. + txs = deletedBlockLog + .getTransactions(BLOCK_DELETE_TX_PER_REQUEST_LIMIT); + LOG.info("Scanned deleted blocks log and got {} delTX to process", + txs.size()); + } catch (IOException e) { + // We may tolerant a number of failures for sometime + // but if it continues to fail, at some point we need to raise + // an exception and probably fail the SCM ? At present, it simply + // continues to retry the scanning. + LOG.error("Failed to get block deletion transactions from delTX log", + e); + } + + if (txs != null) { + for (DeletedBlocksTransaction tx : txs) { + try { + ContainerInfo info = mappingService + .getContainer(tx.getContainerName()); + // Find out the datanode where this TX is supposed to send to. + info.getPipeline().getMachines() + .forEach(entry -> dnTXs.addTransaction(entry, tx)); + } catch (IOException e) { + LOG.warn("Container {} not found, continue to process next", + tx.getContainerName(), e); + } + } + } + return dnTXs; + } + } + + /** + * A wrapper class to hold info about datanode and all deleted block + * transactions that will be sent to this datanode. + */ + private static class DatanodeDeletedBlockTransactions { + + // A list of TXs mapped to a certain datanode ID. + private final Map> transactions; + + DatanodeDeletedBlockTransactions() { + this.transactions = Maps.newHashMap(); + } + + void addTransaction(DatanodeID dnID, DeletedBlocksTransaction tx) { + if (transactions.containsKey(dnID)) { + transactions.get(dnID).add(tx); + } else { + List first = Lists.newArrayList(); + first.add(tx); + transactions.put(dnID, first); + } + LOG.info("Transaction added: {} <- TX({})", dnID, tx.getTxID()); + } + + Set getDatanodes() { + return transactions.keySet(); + } + + boolean isEmpty() { + return transactions.isEmpty(); + } + + boolean hasTransactions(DatanodeID dnID) { + return transactions.containsKey(dnID) && + !transactions.get(dnID).isEmpty(); + } + + List getDatanodeTransactions(DatanodeID dnID) { + return transactions.get(dnID); + } + + List getTransactionIDList(DatanodeID dnID) { + if (hasTransactions(dnID)) { + return transactions.get(dnID).stream() + .map(DeletedBlocksTransaction::getTxID) + .map(String::valueOf) + .collect(Collectors.toList()); + } else { + return Collections.emptyList(); + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java index c21e62c8f80..dec3776c88c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat; @@ -134,4 +135,12 @@ public interface NodeManager extends StorageContainerNodeProtocol, * @return Healthy/Stale/Dead. */ NodeState getNodeState(DatanodeID id); + + /** + * Add a {@link SCMCommand} to the command queue, which are + * handled by HB thread asynchronously. + * @param id + * @param command + */ + default void addDatanodeCommand(DatanodeID id, SCMCommand command) {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java index bcd3a02fe93..6e2805a5d3b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java @@ -836,4 +836,9 @@ public class SCMNodeManager } return nodeCountMap; } + + @Override + public void addDatanodeCommand(DatanodeID id, SCMCommand command) { + this.commandQueue.addCommand(id, command); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java index b37a5dbbb07..198300fb5f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java @@ -23,7 +23,22 @@ package org.apache.hadoop.utils; public interface BackgroundTaskResult { /** - * Returns the size of entries included in this result. + * Returns the size of entries included in this result. */ int getSize(); + + /** + * An empty task result implementation. + */ + class EmptyTaskResult implements BackgroundTaskResult { + + public static EmptyTaskResult newResult() { + return new EmptyTaskResult(); + } + + @Override + public int getSize() { + return 0; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto index a8cfa57e8ff..bb3d137c2d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -122,7 +122,6 @@ message ContainerReportsProto { required reportType type = 3; } - /** * This message is send along with the heart beat to report datanode * storage utilization by SCM. @@ -210,6 +209,7 @@ enum Type { registeredCommand = 3; sendContainerReport = 4; reregisterCommand = 5; + deleteBlocksCommand = 6; } /* @@ -221,6 +221,7 @@ message SCMCommandResponseProto { optional SCMVersionResponseProto versionProto = 4; optional SendContainerReportProto sendReport = 5; optional SCMReregisterCmdResponseProto reregisterProto = 6; + optional SCMDeleteBlocksCmdResponseProto deleteBlocksProto = 7; } @@ -231,6 +232,24 @@ message SCMHeartbeatResponseProto { repeated SCMCommandResponseProto commands = 1; } +// HB response from SCM, contains a list of block deletion transactions. +message SCMDeleteBlocksCmdResponseProto { + repeated DeletedBlocksTransaction deletedBlocksTransactions = 1; +} + +// SendACK response returned by datanode to SCM, currently empty. +message ContainerBlocksDeletionACKResponseProto { +} + +// ACK message datanode sent to SCM, contains the result of +// block deletion transactions. +message ContainerBlocksDeletionACKProto { + message DeleteBlockTransactionResult { + required int64 txID = 1; + required bool success = 2; + } + repeated DeleteBlockTransactionResult results = 1; +} /** * Protocol used from a datanode to StorageContainerManager. @@ -318,6 +337,10 @@ service StorageContainerDatanodeProtocolService { send container reports sends the container report to SCM. This will return a null command as response. */ - rpc sendContainerReport(ContainerReportsProto) returns (SCMHeartbeatResponseProto); + rpc sendContainerReport(ContainerReportsProto) returns (SCMHeartbeatResponseProto); + /** + * Sends the block deletion ACK to SCM. + */ + rpc sendContainerBlocksDeletionACK (ContainerBlocksDeletionACKProto) returns (ContainerBlocksDeletionACKResponseProto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java index 5d8308e5cb9..d0c363de022 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java @@ -17,22 +17,35 @@ */ package org.apache.hadoop.ozone; -import static org.junit.Assert.*; - +import static org.junit.Assert.fail; import java.io.IOException; +import org.apache.commons.lang.RandomStringUtils; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.scm.StorageContainerManager; +import org.apache.hadoop.ozone.scm.block.DeletedBlockLog; import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.junit.Rule; import org.junit.Assert; import org.junit.Test; import org.junit.rules.ExpectedException; +import java.util.List; +import java.util.HashSet; +import java.util.Set; +import java.util.Map; +import java.util.Collections; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; +import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.io.IOUtils; import org.junit.rules.Timeout; import org.mockito.Mockito; +import org.apache.hadoop.test.GenericTestUtils; /** * Test class that exercises the StorageContainerManager. @@ -149,4 +162,103 @@ public class TestStorageContainerManager { Assert.assertTrue(e instanceof IOException); Assert.assertEquals(expectedErrorMessage, e.getMessage()); } + + @Test + public void testBlockDeletionTransactions() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 5); + conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 3000); + conf.setInt(ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 5); + conf.setInt(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, 1000); + MiniOzoneCluster cluster = + new MiniOzoneCluster.Builder(conf).numDataNodes(1) + .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); + + DeletedBlockLog delLog = cluster.getStorageContainerManager() + .getScmBlockManager().getDeletedBlockLog(); + Assert.assertEquals(0, delLog.getNumOfValidTransactions()); + + // Create 20 random names keys. + TestStorageContainerManagerHelper helper = + new TestStorageContainerManagerHelper(cluster, conf); + Map keyLocations = helper.createKeys(20, 4096); + + // These keys will be written into a bunch of containers, + // gets a set of container names, verify container containerBlocks + // on datanodes. + Set containerNames = new HashSet<>(); + for (Map.Entry entry : keyLocations.entrySet()) { + entry.getValue().getKeyLocationList() + .forEach(loc -> containerNames.add(loc.getContainerName())); + } + + // Total number of containerBlocks of these containers should be equal to + // total number of containerBlocks via creation call. + int totalCreatedBlocks = 0; + for (KsmKeyInfo info : keyLocations.values()) { + totalCreatedBlocks += info.getKeyLocationList().size(); + } + Assert.assertTrue(totalCreatedBlocks > 0); + Assert.assertEquals(totalCreatedBlocks, + helper.getAllBlocks(containerNames).size()); + + // Create a deletion TX for each key. + Map> containerBlocks = Maps.newHashMap(); + for (KsmKeyInfo info : keyLocations.values()) { + List list = info.getKeyLocationList(); + list.forEach(location -> { + if (containerBlocks.containsKey(location.getContainerName())) { + containerBlocks.get(location.getContainerName()) + .add(location.getBlockID()); + } else { + List blks = Lists.newArrayList(); + blks.add(location.getBlockID()); + containerBlocks.put(location.getContainerName(), blks); + } + }); + } + for (Map.Entry> tx : containerBlocks.entrySet()) { + delLog.addTransaction(tx.getKey(), tx.getValue()); + } + + // Verify a few TX gets created in the TX log. + Assert.assertTrue(delLog.getNumOfValidTransactions() > 0); + + // Once TXs are written into the log, SCM starts to fetch TX + // entries from the log and schedule block deletions in HB interval, + // after sometime, all the TX should be proceed and by then + // the number of containerBlocks of all known containers will be + // empty again. + GenericTestUtils.waitFor(() -> { + try { + return delLog.getNumOfValidTransactions() == 0; + } catch (IOException e) { + return false; + } + }, 1000, 10000); + Assert.assertTrue(helper.getAllBlocks(containerNames).isEmpty()); + + // Continue the work, add some TXs that with known container names, + // but unknown block IDs. + for (String containerName : containerBlocks.keySet()) { + // Add 2 TXs per container. + delLog.addTransaction(containerName, + Collections.singletonList(RandomStringUtils.randomAlphabetic(5))); + delLog.addTransaction(containerName, + Collections.singletonList(RandomStringUtils.randomAlphabetic(5))); + } + + // Verify a few TX gets created in the TX log. + Assert.assertTrue(delLog.getNumOfValidTransactions() > 0); + + // These blocks cannot be found in the container, skip deleting them + // eventually these TX will success. + GenericTestUtils.waitFor(() -> { + try { + return delLog.getFailedTransactions().size() == 0; + } catch (IOException e) { + return false; + } + }, 1000, 10000); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java new file mode 100644 index 00000000000..2123d6fea36 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.helpers.KeyUtils; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; +import org.apache.hadoop.ozone.web.handlers.BucketArgs; +import org.apache.hadoop.ozone.web.handlers.KeyArgs; +import org.apache.hadoop.ozone.web.handlers.UserArgs; +import org.apache.hadoop.ozone.web.handlers.VolumeArgs; +import org.apache.hadoop.ozone.web.interfaces.StorageHandler; +import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; +import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; +import org.apache.hadoop.utils.MetadataStore; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import java.util.Map; +import java.util.LinkedList; +import java.util.Set; + +/** + * A helper class used by {@link TestStorageContainerManager} to generate + * some keys and helps to verify containers and blocks locations. + */ +public class TestStorageContainerManagerHelper { + + private final MiniOzoneCluster cluster; + private final Configuration conf; + private final StorageHandler storageHandler; + + public TestStorageContainerManagerHelper(MiniOzoneCluster cluster, + Configuration conf) throws IOException { + this.cluster = cluster; + this.conf = conf; + storageHandler = new ObjectStoreHandler(conf).getStorageHandler(); + } + + public Map createKeys(int numOfKeys, int keySize) + throws Exception { + Map keyLocationMap = Maps.newHashMap(); + String volume = "volume" + RandomStringUtils.randomNumeric(5); + String bucket = "bucket" + RandomStringUtils.randomNumeric(5); + String userName = "user" + RandomStringUtils.randomNumeric(5); + String adminName = "admin" + RandomStringUtils.randomNumeric(5); + UserArgs userArgs = new UserArgs(null, OzoneUtils.getRequestID(), + null, null, null, null); + + VolumeArgs createVolumeArgs = new VolumeArgs(volume, userArgs); + createVolumeArgs.setUserName(userName); + createVolumeArgs.setAdminName(adminName); + storageHandler.createVolume(createVolumeArgs); + + BucketArgs bucketArgs = new BucketArgs(bucket, createVolumeArgs); + bucketArgs.setAddAcls(new LinkedList<>()); + bucketArgs.setRemoveAcls(new LinkedList<>()); + bucketArgs.setStorageType(StorageType.DISK); + storageHandler.createBucket(bucketArgs); + + // Write 20 keys in bucket. + Set keyNames = Sets.newHashSet(); + KeyArgs keyArgs; + for (int i = 0; i < numOfKeys; i++) { + String keyName = RandomStringUtils.randomAlphabetic(5) + i; + keyNames.add(keyName); + keyArgs = new KeyArgs(keyName, bucketArgs); + keyArgs.setSize(keySize); + // Just for testing list keys call, so no need to write real data. + OutputStream stream = storageHandler.newKeyWriter(keyArgs); + stream.write(DFSUtil.string2Bytes( + RandomStringUtils.randomAlphabetic(5))); + stream.close(); + } + + for (String key : keyNames) { + KsmKeyArgs arg = new KsmKeyArgs.Builder() + .setVolumeName(volume) + .setBucketName(bucket) + .setKeyName(key) + .build(); + KsmKeyInfo location = cluster.getKeySpaceManager() + .lookupKey(arg); + keyLocationMap.put(key, location); + } + return keyLocationMap; + } + + public List getPendingDeletionBlocks(String containerName) + throws IOException { + List pendingDeletionBlocks = Lists.newArrayList(); + MetadataStore meta = getContainerMetadata(containerName); + KeyPrefixFilter filter = + new KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX); + List> kvs = meta + .getRangeKVs(null, Integer.MAX_VALUE, filter); + kvs.forEach(entry -> { + String key = DFSUtil.bytes2String(entry.getKey()); + pendingDeletionBlocks + .add(key.replace(OzoneConsts.DELETING_KEY_PREFIX, "")); + }); + return pendingDeletionBlocks; + } + + public List getAllBlocks(Set containerNames) + throws IOException { + List allBlocks = Lists.newArrayList(); + for (String containerName : containerNames) { + allBlocks.addAll(getAllBlocks(containerName)); + } + return allBlocks; + } + + public List getAllBlocks(String containerName) throws IOException { + List allBlocks = Lists.newArrayList(); + MetadataStore meta = getContainerMetadata(containerName); + MetadataKeyFilter filter = + (preKey, currentKey, nextKey) -> !DFSUtil.bytes2String(currentKey) + .startsWith(OzoneConsts.DELETING_KEY_PREFIX); + List> kvs = + meta.getRangeKVs(null, Integer.MAX_VALUE, filter); + kvs.forEach(entry -> { + String key = DFSUtil.bytes2String(entry.getKey()); + allBlocks.add(key.replace(OzoneConsts.DELETING_KEY_PREFIX, "")); + }); + return allBlocks; + } + + private MetadataStore getContainerMetadata(String containerName) + throws IOException { + Pipeline pipeline = cluster.getStorageContainerManager() + .getContainer(containerName); + DatanodeID leadDN = pipeline.getLeader(); + OzoneContainer containerServer = + getContainerServerByDatanodeID(leadDN.getDatanodeUuid()); + ContainerData containerData = containerServer.getContainerManager() + .readContainer(containerName); + return KeyUtils.getDB(containerData, conf); + } + + private OzoneContainer getContainerServerByDatanodeID(String dnUUID) + throws IOException { + for (DataNode dn : cluster.getDataNodes()) { + if (dn.getDatanodeId().getDatanodeUuid().equals(dnUUID)) { + return dn.getOzoneContainerManager(); + } + } + throw new IOException("Unable to get the ozone container " + + "for given datanode ID " + dnUUID); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java index c0bdd9e62a2..edef2b478c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java @@ -25,6 +25,8 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; +import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto; import org.apache.hadoop.ozone.scm.VersionInfo; import java.io.IOException; @@ -188,6 +190,13 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol { .build(); } + @Override + public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( + ContainerBlocksDeletionACKProto request) throws IOException { + return ContainerBlocksDeletionACKResponseProto + .newBuilder().getDefaultInstanceForType(); + } + public ReportState getReportState() { return this.reportState; }