diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java new file mode 100644 index 00000000000..5571f0af111 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.common; + +import org.apache.hadoop.ozone.protocol.proto + .ScmBlockLocationProtocolProtos.KeyBlocks; + +import java.util.List; + +/** + * A group of blocks relations relevant, e.g belong to a certain object key. + */ +public final class BlockGroup { + + private String groupID; + private List blockIDs; + private BlockGroup(String groupID, List blockIDs) { + this.groupID = groupID; + this.blockIDs = blockIDs; + } + + public List getBlockIDList() { + return blockIDs; + } + + public String getGroupID() { + return groupID; + } + + public KeyBlocks getProto() { + return KeyBlocks.newBuilder().setKey(groupID) + .addAllBlocks(blockIDs).build(); + } + + /** + * Parses a KeyBlocks proto to a group of blocks. + * @param proto KeyBlocks proto. + * @return a group of blocks. + */ + public static BlockGroup getFromProto(KeyBlocks proto) { + return BlockGroup.newBuilder().setKeyName(proto.getKey()) + .addAllBlockIDs(proto.getBlocksList()).build(); + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * BlockGroup instance builder. + */ + public static class Builder { + + private String groupID; + private List blockIDs; + + public Builder setKeyName(String blockGroupID) { + this.groupID = blockGroupID; + return this; + } + + public Builder addAllBlockIDs(List keyBlocks) { + this.blockIDs = keyBlocks; + return this; + } + + public BlockGroup build() { + return new BlockGroup(groupID, blockIDs); + } + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/common/DeleteBlockGroupResult.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/common/DeleteBlockGroupResult.java new file mode 100644 index 00000000000..f56f832b069 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/common/DeleteBlockGroupResult.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.common; + +import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmBlockResult; +import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result; +import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Result to delete a group of blocks. + */ +public class DeleteBlockGroupResult { + private String objectKey; + private List blockResultList; + public DeleteBlockGroupResult(String objectKey, + List blockResultList) { + this.objectKey = objectKey; + this.blockResultList = blockResultList; + } + + public String getObjectKey() { + return objectKey; + } + + public List getBlockResultList() { + return blockResultList; + } + + public List getBlockResultProtoList() { + List resultProtoList = + new ArrayList<>(blockResultList.size()); + for (DeleteBlockResult result : blockResultList) { + DeleteScmBlockResult proto = DeleteScmBlockResult.newBuilder() + .setKey(result.getKey()) + .setResult(result.getResult()).build(); + resultProtoList.add(proto); + } + return resultProtoList; + } + + public static List convertBlockResultProto( + List results) { + List protoResults = new ArrayList<>(results.size()); + for (DeleteScmBlockResult result : results) { + protoResults.add(new DeleteBlockResult(result.getKey(), + result.getResult())); + } + return protoResults; + } + + /** + * Only if all blocks are successfully deleted, this group is considered + * to be successfully executed. + * + * @return true if all blocks are successfully deleted, false otherwise. + */ + public boolean isSuccess() { + for (DeleteBlockResult result : blockResultList) { + if (result.getResult() != Result.success) { + return false; + } + } + return true; + } + + /** + * @return A list of deletion failed block IDs. + */ + public List getFailedBlocks() { + List failedBlocks = blockResultList.stream() + .filter(result -> result.getResult() != Result.success) + .map(DeleteBlockResult::getKey).collect(Collectors.toList()); + return failedBlocks; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java index e69300cb159..6710b0a332c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java @@ -74,4 +74,8 @@ public final class KSMConfigKeys { "ozone.ksm.group.rights"; public static final OzoneAcl.OzoneACLRights OZONE_KSM_GROUP_RIGHTS_DEFAULT = OzoneAcl.OzoneACLRights.READ_WRITE; + + public static final String OZONE_KEY_DELETING_LIMIT_PER_TASK = + "ozone.key.deleting.limit.per.task"; + public static final int OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT = 1000; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmBlockLocationProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmBlockLocationProtocol.java index e944375267a..77d040da932 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmBlockLocationProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmBlockLocationProtocol.java @@ -22,8 +22,9 @@ import java.io.IOException; import java.util.List; import java.util.Set; +import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; +import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; -import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult; /** * ScmBlockLocationProtocol is used by an HDFS node to find the set of nodes @@ -53,13 +54,14 @@ public interface ScmBlockLocationProtocol { AllocatedBlock allocateBlock(long size) throws IOException; /** - * Delete the set of keys specified. + * Delete blocks for a set of object keys. * - * @param keys batch of block keys to delete. + * @param keyBlocksInfoList Map of object key and its blocks. * @return list of block deletion results. * @throws IOException if there is any failure. * */ - List deleteBlocks(Set keys) throws IOException; + List deleteKeyBlocks( + List keyBlocksInfoList) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java index 4a0d50be6cc..2e323e08770 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java @@ -24,24 +24,24 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto; import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos .AllocateScmBlockRequestProto; import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos .AllocateScmBlockResponseProto; import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos - .DeleteScmBlocksRequestProto; -import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos - .DeleteScmBlocksResponseProto; -import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos - .DeleteScmBlockResult; + .DeleteScmKeyBlocksResponseProto; import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos .GetScmBlockLocationsRequestProto; import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos .GetScmBlockLocationsResponseProto; import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos .ScmLocatedBlockProto; +import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos + .KeyBlocks; import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; -import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol; @@ -50,6 +50,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; /** * This class is the client-side translator to translate the requests made on @@ -147,30 +148,32 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB /** * Delete the set of keys specified. * - * @param keys batch of block keys to delete. + * @param keyBlocksInfoList batch of block keys to delete. * @return list of block deletion results. * @throws IOException if there is any failure. * */ @Override - public List deleteBlocks(Set keys) - throws IOException { - Preconditions.checkArgument(keys != null && !keys.isEmpty(), - "keys to be deleted cannot be null or empty"); - DeleteScmBlocksRequestProto request = DeleteScmBlocksRequestProto - .newBuilder() - .addAllKeys(keys) - .build(); - final DeleteScmBlocksResponseProto resp; + public List deleteKeyBlocks( + List keyBlocksInfoList) throws IOException { + List keyBlocksProto = keyBlocksInfoList.stream() + .map(BlockGroup::getProto).collect(Collectors.toList()); + DeleteScmKeyBlocksRequestProto request = DeleteScmKeyBlocksRequestProto + .newBuilder().addAllKeyBlocks(keyBlocksProto).build(); + + final DeleteScmKeyBlocksResponseProto resp; try { - resp = rpcProxy.deleteScmBlocks(NULL_RPC_CONTROLLER, request); + resp = rpcProxy.deleteScmKeyBlocks(NULL_RPC_CONTROLLER, request); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } - List results = new ArrayList(resp.getResultsCount()); - for (DeleteScmBlockResult result : resp.getResultsList()) { - results.add(new DeleteBlockResult(result.getKey(), result.getResult())); - } + List results = + new ArrayList<>(resp.getResultsCount()); + results.addAll(resp.getResultsList().stream().map( + result -> new DeleteBlockGroupResult(result.getObjectKey(), + DeleteBlockGroupResult + .convertBlockResultProto(result.getBlockResultsList()))) + .collect(Collectors.toList())); return results; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ScmBlockLocationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ScmBlockLocationProtocol.proto index e9c3abf3002..51899f91ac5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ScmBlockLocationProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ScmBlockLocationProtocol.proto @@ -64,17 +64,40 @@ message AllocateScmBlockRequestProto { } /** - * keys - batch of block keys to deleted + * A delete key request sent by KSM to SCM, it contains + * multiple number of keys (and their blocks). */ -message DeleteScmBlocksRequestProto { - repeated string keys = 1; +message DeleteScmKeyBlocksRequestProto { + repeated KeyBlocks keyBlocks = 1; } /** - * deletedKeys - keys that are deleted successfully + * A object key and all its associated blocks. + * We need to encapsulate object key name plus the blocks in this potocol + * because SCM needs to response KSM with the keys it has deleted. + * If the response only contains blocks, it will be very expensive for + * KSM to figure out what keys have been deleted. */ -message DeleteScmBlocksResponseProto { - repeated DeleteScmBlockResult results = 1; +message KeyBlocks { + required string key = 1; + repeated string blocks = 2; +} + +/** + * A delete key response from SCM to KSM, it contains multiple child-results. + * Each child-result represents a key deletion result, only if all blocks of + * a key are successfully deleted, this key result is considered as succeed. + */ +message DeleteScmKeyBlocksResponseProto { + repeated DeleteKeyBlocksResultProto results = 1; +} + +/** + * A key deletion result. It contains all the block deletion results. + */ +message DeleteKeyBlocksResultProto { + required string objectKey = 1; + repeated DeleteScmBlockResult blockResults = 2; } message DeleteScmBlockResult { @@ -126,8 +149,8 @@ service ScmBlockLocationProtocolService { returns (AllocateScmBlockResponseProto); /** - * Deletes one or multiple block keys from SCM. + * Deletes blocks for a set of object keys from SCM. */ - rpc deleteScmBlocks(DeleteScmBlocksRequestProto) - returns (DeleteScmBlocksResponseProto); + rpc deleteScmKeyBlocks(DeleteScmKeyBlocksRequestProto) + returns (DeleteScmKeyBlocksResponseProto); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java index 1027bd4211a..010de58daff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.ksm; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo; +import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs; import org.apache.hadoop.utils.BatchOperation; @@ -206,4 +207,16 @@ public interface KSMMetadataManager { */ List listVolumes(String userName, String prefix, String startKey, int maxKeys) throws IOException; + + /** + * Returns a list of pending deletion key info that ups to the given count. + * Each entry is a {@link BlockGroup}, which contains the info about the + * key name and all its associated block IDs. A pending deletion key is + * stored with #deleting# prefix in KSM DB. + * + * @param count max number of keys to return. + * @return a list of {@link BlockGroup} represent keys and blocks. + * @throws IOException + */ + List getPendingDeletionKeys(int count) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java index e3b3db204cd..9413e1dff2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java @@ -21,9 +21,8 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo; -import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; -import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs; +import org.apache.hadoop.ozone.ksm.helpers.*; +import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.ksm.exceptions.KSMException; @@ -47,6 +46,7 @@ import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX; import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME; @@ -440,4 +440,28 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager { return builder.build(); } + + @Override + public List getPendingDeletionKeys(final int count) + throws IOException { + List keyBlocksList = Lists.newArrayList(); + final MetadataKeyFilter deletingKeyFilter = + new KeyPrefixFilter(DELETING_KEY_PREFIX); + List> rangeResult = + store.getRangeKVs(null, count, deletingKeyFilter); + for (Map.Entry entry : rangeResult) { + KsmKeyInfo info = + KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue())); + // Get block keys as a list. + List item = info.getKeyLocationList().stream() + .map(KsmKeyLocationInfo::getBlockID) + .collect(Collectors.toList()); + BlockGroup keyBlocks = BlockGroup.newBuilder() + .setKeyName(DFSUtil.bytes2String(entry.getKey())) + .addAllBlockIDs(item) + .build(); + keyBlocksList.add(keyBlocks); + } + return keyBlocksList; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyDeletingService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyDeletingService.java new file mode 100644 index 00000000000..293e12dfd14 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyDeletingService.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.ksm; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.utils.BackgroundService; +import org.apache.hadoop.utils.BackgroundTask; +import org.apache.hadoop.utils.BackgroundTaskQueue; +import org.apache.hadoop.utils.BackgroundTaskResult; +import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK; +import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT; + +/** + * This is the background service to delete keys. + * Scan the metadata of ksm periodically to get + * the keys with prefix "#deleting" and ask scm to + * delete metadata accordingly, if scm returns + * success for keys, then clean up those keys. + */ +public class KeyDeletingService extends BackgroundService { + + private static final Logger LOG = + LoggerFactory.getLogger(KeyDeletingService.class); + + // The thread pool size for key deleting service. + private final static int KEY_DELETING_CORE_POOL_SIZE = 2; + + private final ScmBlockLocationProtocol scmClient; + private final KeyManager manager; + private final int keyLimitPerTask; + + public KeyDeletingService(ScmBlockLocationProtocol scmClient, + KeyManager manager, int serviceInterval, + long serviceTimeout, Configuration conf) { + super("KeyDeletingService", serviceInterval, TimeUnit.MILLISECONDS, + KEY_DELETING_CORE_POOL_SIZE, serviceTimeout); + this.scmClient = scmClient; + this.manager = manager; + this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK, + OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT); + } + + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue queue = new BackgroundTaskQueue(); + queue.add(new KeyDeletingTask()); + return queue; + } + + /** + * A key deleting task scans KSM DB and looking for a certain number + * of pending-deletion keys, sends these keys along with their associated + * blocks to SCM for deletion. Once SCM confirms keys are deleted (once + * SCM persisted the blocks info in its deletedBlockLog), it removes + * these keys from the DB. + */ + private class KeyDeletingTask implements + BackgroundTask { + + @Override + public int getPriority() { + return 0; + } + + @Override + public BackgroundTaskResult call() throws Exception { + try { + List keyBlocksList = manager + .getPendingDeletionKeys(keyLimitPerTask); + if (keyBlocksList.size() > 0) { + LOG.info("Found {} to-delete keys in KSM", keyBlocksList.size()); + List results = + scmClient.deleteKeyBlocks(keyBlocksList); + for (DeleteBlockGroupResult result : results) { + if (result.isSuccess()) { + try { + // Purge key from KSM DB. + manager.deletePendingDeletionKey(result.getObjectKey()); + LOG.info("Key {} deleted from KSM DB", result.getObjectKey()); + } catch (IOException e) { + // if a pending deletion key is failed to delete, + // print a warning here and retain it in this state, + // so that it can be attempt to delete next time. + LOG.warn("Failed to delete pending-deletion key {}", + result.getObjectKey(), e); + } + } else { + // Key deletion failed, retry in next interval. + LOG.warn("Key {} deletion failed because some of the blocks" + + " were failed to delete, failed blocks: {}", + result.getObjectKey(), + String.join(",", result.getFailedBlocks())); + } + } + return results::size; + } else { + LOG.info("No pending deletion key found in KSM"); + } + } catch (IOException e) { + LOG.error("Unable to get pending deletion keys, retry in" + + " next interval", e); + } + return EmptyTaskResult.newResult(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java index 753a564eeac..5625cb76fbb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java @@ -16,6 +16,7 @@ */ package org.apache.hadoop.ozone.ksm; +import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; @@ -26,6 +27,17 @@ import java.util.List; * Handles key level commands. */ public interface KeyManager { + + /** + * Start key manager. + */ + void start(); + + /** + * Stop key manager. + */ + void stop() throws IOException; + /** * Given the args of a key to put, return a pipeline for the key. Writes * the key to pipeline mapping to meta data. @@ -89,4 +101,26 @@ public interface KeyManager { List listKeys(String volumeName, String bucketName, String startKey, String keyPrefix, int maxKeys) throws IOException; + + /** + * Returns a list of pending deletion key info that ups to the given count. + * Each entry is a {@link BlockGroup}, which contains the info about the + * key name and all its associated block IDs. A pending deletion key is + * stored with #deleting# prefix in KSM DB. + * + * @param count max number of keys to return. + * @return a list of {@link BlockGroup} representing keys and blocks. + * @throws IOException + */ + List getPendingDeletionKeys(int count) throws IOException; + + /** + * Deletes a pending deletion key by its name. This is often called when + * key can be safely deleted from this layer. Once called, all footprints + * of the key will be purged from KSM DB. + * + * @param objectKeyName object key name with #deleting# prefix. + * @throws IOException if specified key doesn't exist or other I/O errors. + */ + void deletePendingDeletionKey(String objectKeyName) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java index aea7b3199e8..2bf9a33a7a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java @@ -17,7 +17,10 @@ package org.apache.hadoop.ozone.ksm; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs; +import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; import org.apache.hadoop.ozone.OzoneConfiguration; @@ -27,6 +30,7 @@ import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyI import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.BackgroundService; import org.apache.hadoop.utils.BatchOperation; import org.iq80.leveldb.DBException; import org.slf4j.Logger; @@ -35,7 +39,12 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_KEY; @@ -52,6 +61,7 @@ public class KeyManagerImpl implements KeyManager { private final ScmBlockLocationProtocol scmBlockClient; private final KSMMetadataManager metadataManager; private final long scmBlockSize; + private final BackgroundService keyDeletingService; public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient, KSMMetadataManager metadataManager, OzoneConfiguration conf) { @@ -59,6 +69,24 @@ public class KeyManagerImpl implements KeyManager { this.metadataManager = metadataManager; this.scmBlockSize = conf.getLong(OZONE_SCM_BLOCK_SIZE_KEY, OZONE_SCM_BLOCK_SIZE_DEFAULT); + int svcInterval = conf.getInt( + OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, + OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT); + long serviceTimeout = conf.getTimeDuration( + OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, + OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); + keyDeletingService = new KeyDeletingService( + scmBlockClient, this, svcInterval, serviceTimeout, conf); + } + + @Override + public void start() { + keyDeletingService.start(); + } + + @Override + public void stop() throws IOException { + keyDeletingService.shutdown(); } @Override @@ -181,7 +209,6 @@ public class KeyManagerImpl implements KeyManager { @Override public void deleteKey(KsmKeyArgs args) throws IOException { Preconditions.checkNotNull(args); - metadataManager.writeLock().lock(); String volumeName = args.getVolumeName(); String bucketName = args.getBucketName(); @@ -223,4 +250,39 @@ public class KeyManagerImpl implements KeyManager { metadataManager.readLock().unlock(); } } + + @Override + public List getPendingDeletionKeys(final int count) + throws IOException { + metadataManager.readLock().lock(); + try { + return metadataManager.getPendingDeletionKeys(count); + } finally { + metadataManager.readLock().unlock(); + } + } + + @Override + public void deletePendingDeletionKey(String objectKeyName) + throws IOException{ + Preconditions.checkNotNull(objectKeyName); + if (!objectKeyName.startsWith(OzoneConsts.DELETING_KEY_PREFIX)) { + throw new IllegalArgumentException("Invalid key name," + + " the name should be the key name with deleting prefix"); + } + + // Simply removes the entry from KSM DB. + metadataManager.writeLock().lock(); + try { + byte[] pendingDelKey = DFSUtil.string2Bytes(objectKeyName); + byte[] delKeyValue = metadataManager.get(pendingDelKey); + if (delKeyValue == null) { + throw new IOException("Failed to delete key " + objectKeyName + + " because it is not found in DB"); + } + metadataManager.delete(pendingDelKey); + } finally { + metadataManager.writeLock().unlock(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java index 3b76e5cea83..bdd97d10b2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java @@ -217,6 +217,7 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl ksmRpcAddress)); DefaultMetricsSystem.initialize("KeySpaceManager"); metadataManager.start(); + keyManager.start(); ksmRpcServer.start(); httpServer.start(); registerMXBean(); @@ -228,8 +229,9 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl */ public void stop() { try { - ksmRpcServer.stop(); metadataManager.stop(); + ksmRpcServer.stop(); + keyManager.stop(); httpServer.stop(); metrics.unRegister(); unregisterMXBean(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java index e8843fc384b..7ea06ad11ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java @@ -21,8 +21,10 @@ import com.google.common.collect.Sets; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteKeyBlocksResultProto; import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; -import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult; +import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB; @@ -32,11 +34,9 @@ import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto .ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto; import org.apache.hadoop.ozone.protocol.proto - .ScmBlockLocationProtocolProtos.DeleteScmBlocksRequestProto; + .ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto; import org.apache.hadoop.ozone.protocol.proto - .ScmBlockLocationProtocolProtos.DeleteScmBlocksResponseProto; -import static org.apache.hadoop.ozone.protocol.proto - .ScmBlockLocationProtocolProtos.DeleteScmBlockResult; + .ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto; import org.apache.hadoop.ozone.protocol.proto .ScmBlockLocationProtocolProtos.GetScmBlockLocationsRequestProto; import org.apache.hadoop.ozone.protocol.proto @@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.protocol.proto import java.io.IOException; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; /** * This class is the server-side translator that forwards requests received on @@ -123,21 +124,22 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB } @Override - public DeleteScmBlocksResponseProto deleteScmBlocks( - RpcController controller, DeleteScmBlocksRequestProto req) + public DeleteScmKeyBlocksResponseProto deleteScmKeyBlocks( + RpcController controller, DeleteScmKeyBlocksRequestProto req) throws ServiceException { - Set keys = Sets.newLinkedHashSetWithExpectedSize( - req.getKeysCount()); - for (String key : req.getKeysList()) { - keys.add(key); - } - DeleteScmBlocksResponseProto.Builder resp = - DeleteScmBlocksResponseProto.newBuilder(); + DeleteScmKeyBlocksResponseProto.Builder resp = + DeleteScmKeyBlocksResponseProto.newBuilder(); try { - final List results = impl.deleteBlocks(keys); - for (DeleteBlockResult result: results) { - DeleteScmBlockResult.Builder deleteResult = DeleteScmBlockResult - .newBuilder().setKey(result.getKey()).setResult(result.getResult()); + List infoList = req.getKeyBlocksList().stream() + .map(BlockGroup::getFromProto).collect(Collectors.toList()); + final List results = + impl.deleteKeyBlocks(infoList); + for (DeleteBlockGroupResult result: results) { + DeleteKeyBlocksResultProto.Builder deleteResult = + DeleteKeyBlocksResultProto + .newBuilder() + .setObjectKey(result.getObjectKey()) + .addAllBlockResults(result.getBlockResultProtoList()); resp.addResults(deleteResult.build()); } } catch (IOException ex) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index 5a97735e172..6677b65c78c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -33,6 +33,8 @@ import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; +import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; @@ -85,6 +87,7 @@ import org.slf4j.LoggerFactory; import javax.management.ObjectName; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; @@ -838,19 +841,26 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl } /** - * Delete blocks. - * @param keys batch of block keys to delete. + * Delete blocks for a set of object keys. + * + * @param keyBlocksInfoList list of block keys with object keys to delete. * @return deletion results. */ - public List deleteBlocks(final Set keys) { - List results = new LinkedList<>(); - for (String key: keys) { + public List deleteKeyBlocks( + List keyBlocksInfoList) throws IOException { + LOG.info("SCM is informed by KSM to delete {} blocks", + keyBlocksInfoList.size()); + List results = new ArrayList<>(); + for (BlockGroup keyBlocks : keyBlocksInfoList) { Result resultCode; try { - scmBlockManager.deleteBlock(key); + // We delete blocks in an atomic operation to prevent getting + // into state like only a partial of blocks are deleted, + // which will leave key in an inconsistent state. + scmBlockManager.deleteBlocks(keyBlocks.getBlockIDList()); resultCode = Result.success; } catch (SCMException scmEx) { - LOG.warn("Fail to delete block: {}", key, scmEx); + LOG.warn("Fail to delete block: {}", keyBlocks.getGroupID(), scmEx); switch (scmEx.getResult()) { case CHILL_MODE_EXCEPTION: resultCode = Result.chillMode; @@ -862,10 +872,16 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl resultCode = Result.unknownFailure; } } catch (IOException ex) { - LOG.warn("Fail to delete block: {}", key, ex); + LOG.warn("Fail to delete blocks for object key: {}", + keyBlocks.getGroupID(), ex); resultCode = Result.unknownFailure; } - results.add(new DeleteBlockResult(key, resultCode)); + List blockResultList = new ArrayList<>(); + for (String blockKey : keyBlocks.getBlockIDList()) { + blockResultList.add(new DeleteBlockResult(blockKey, resultCode)); + } + results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(), + blockResultList)); } return results; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java index 4672b338f7e..2693c061b32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java @@ -22,6 +22,7 @@ import org.apache.hadoop.scm.container.common.helpers.Pipeline; import java.io.Closeable; import java.io.IOException; +import java.util.List; /** * @@ -46,11 +47,16 @@ public interface BlockManager extends Closeable { Pipeline getBlock(String key) throws IOException; /** - * Given a key of the block, delete the block. - * @param key - key of the block. - * @throws IOException + * Deletes a list of blocks in an atomic operation. Internally, SCM + * writes these blocks into a {@link DeletedBlockLog} and deletes them + * from SCM DB. If this is successful, given blocks are entering pending + * deletion state and becomes invisible from SCM namespace. + * + * @param blockIDs block IDs. This is often the list of blocks of + * a particular object key. + * @throws IOException if exception happens, non of the blocks is deleted. */ - void deleteBlock(String key) throws IOException; + void deleteBlocks(List blockIDs) throws IOException; /** * @return the block deletion transaction log maintained by SCM. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java index d5202464d35..1a2dc149e66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java @@ -45,6 +45,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -382,8 +383,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { } // now we should have some candidates in ALLOCATE state if (candidates.size() == 0) { - throw new SCMException("Fail to find any container to allocate block " + - "of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE); + throw new SCMException("Fail to find any container to allocate block " + + "of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE); } } @@ -475,35 +476,84 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { } /** - * Given a block key, delete a block. - * @param key - block key assigned by SCM. - * @throws IOException + * Deletes a list of blocks in an atomic operation. Internally, SCM + * writes these blocks into a {@link DeletedBlockLog} and deletes them + * from SCM DB. If this is successful, given blocks are entering pending + * deletion state and becomes invisible from SCM namespace. + * + * @param blockIDs block IDs. This is often the list of blocks of + * a particular object key. + * @throws IOException if exception happens, non of the blocks is deleted. */ @Override - public void deleteBlock(final String key) throws IOException { + public void deleteBlocks(List blockIDs) throws IOException { if (!nodeManager.isOutOfNodeChillMode()) { throw new SCMException("Unable to delete block while in chill mode", CHILL_MODE_EXCEPTION); } lock.lock(); + LOG.info("Deleting blocks {}", String.join(",", blockIDs)); + Map> containerBlocks = new HashMap<>(); + BatchOperation batch = new BatchOperation(); + BatchOperation rollbackBatch = new BatchOperation(); + // TODO: track the block size info so that we can reclaim the container + // TODO: used space when the block is deleted. try { - byte[] containerBytes = blockStore.get(DFSUtil.string2Bytes(key)); - if (containerBytes == null) { - throw new SCMException("Specified block key does not exist. key : " + - key, FAILED_TO_FIND_BLOCK); + for (String blockKey : blockIDs) { + byte[] blockKeyBytes = DFSUtil.string2Bytes(blockKey); + byte[] containerBytes = blockStore.get(blockKeyBytes); + if (containerBytes == null) { + throw new SCMException( + "Specified block key does not exist. key : " + blockKey, + FAILED_TO_FIND_BLOCK); + } + batch.delete(blockKeyBytes); + rollbackBatch.put(blockKeyBytes, containerBytes); + + // Merge blocks to a container to blocks mapping, + // prepare to persist this info to the deletedBlocksLog. + String containerName = DFSUtil.bytes2String(containerBytes); + if (containerBlocks.containsKey(containerName)) { + containerBlocks.get(containerName).add(blockKey); + } else { + List item = new ArrayList<>(); + item.add(blockKey); + containerBlocks.put(containerName, item); + } } - // TODO: track the block size info so that we can reclaim the container - // TODO: used space when the block is deleted. - BatchOperation batch = new BatchOperation(); - String deletedKeyName = getDeletedKeyName(key); - // Add a tombstone for the deleted key - batch.put(DFSUtil.string2Bytes(deletedKeyName), containerBytes); - // Delete the block key - batch.delete(DFSUtil.string2Bytes(key)); + + // We update SCM DB first, so if this step fails, we end up here, + // nothing gets into the delLog so no blocks will be accidentally + // removed. If we write the log first, once log is written, the + // async deleting service will start to scan and might be picking + // up some blocks to do real deletions, that might cause data loss. blockStore.writeBatch(batch); - // TODO: Add async tombstone clean thread to send delete command to - // datanodes in the pipeline to clean up the blocks from containers. + try { + deletedBlockLog.addTransactions(containerBlocks); + } catch (IOException e) { + try { + // If delLog update is failed, we need to rollback the changes. + blockStore.writeBatch(rollbackBatch); + } catch (IOException rollbackException) { + // This is a corner case. AddTX fails and rollback also fails, + // this will leave these blocks in inconsistent state. They were + // moved to pending deletion state in SCM DB but were not written + // into delLog so real deletions would not be done. Blocks become + // to be invisible from namespace but actual data are not removed. + // We log an error here so admin can manually check and fix such + // errors. + LOG.error("Blocks might be in inconsistent state because" + + " they were moved to pending deletion state in SCM DB but" + + " not written into delLog. Admin can manually add them" + + " into delLog for deletions. Inconsistent block list: {}", + String.join(",", blockIDs), e); + throw rollbackException; + } + throw new IOException("Skip writing the deleted blocks info to" + + " the delLog because addTransaction fails. Batch skipped: " + + String.join(",", blockIDs), e); + } // TODO: Container report handling of the deleted blocks: // Remove tombstone and update open container usage. // We will revisit this when the closed container replication is done. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java index 9e268a63f66..bcbbe15d999 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java @@ -23,6 +23,7 @@ import org.apache.hadoop.ozone.protocol.proto import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.Map; /** * The DeletedBlockLog is a persisted log in SCM to keep tracking @@ -86,6 +87,22 @@ public interface DeletedBlockLog extends Closeable { void addTransaction(String containerName, List blocks) throws IOException; + /** + * Creates block deletion transactions for a set of containers, + * add into the log and persist them atomically. An object key + * might be stored in multiple containers and multiple blocks, + * this API ensures that these updates are done in atomic manner + * so if any of them fails, the entire operation fails without + * any updates to the log. Note, this doesn't mean to create only + * one transaction, it creates multiple transactions (depends on the + * number of containers) together (on success) or non (on failure). + * + * @param containerBlocksMap a map of containerBlocks. + * @throws IOException + */ + void addTransactions(Map> containerBlocksMap) + throws IOException; + /** * Returns the total number of valid transactions. A transaction is * considered to be valid as long as its count is in range [0, MAX_RETRY]. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java index 738157d221e..e7e92d16eaf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -209,6 +210,16 @@ public class DeletedBlockLogImpl implements DeletedBlockLog { } } + private DeletedBlocksTransaction constructNewTransaction(long txID, + String containerName, List blocks) { + return DeletedBlocksTransaction.newBuilder() + .setTxID(txID) + .setContainerName(containerName) + .addAllBlockID(blocks) + .setCount(0) + .build(); + } + /** * {@inheritDoc} * @@ -244,12 +255,8 @@ public class DeletedBlockLogImpl implements DeletedBlockLog { BatchOperation batch = new BatchOperation(); lock.lock(); try { - DeletedBlocksTransaction tx = DeletedBlocksTransaction.newBuilder() - .setTxID(lastTxID + 1) - .setContainerName(containerName) - .addAllBlockID(blocks) - .setCount(0) - .build(); + DeletedBlocksTransaction tx = constructNewTransaction(lastTxID + 1, + containerName, blocks); byte[] key = Longs.toByteArray(lastTxID + 1); batch.put(key, tx.toByteArray()); @@ -284,6 +291,35 @@ public class DeletedBlockLogImpl implements DeletedBlockLog { } } + /** + * {@inheritDoc} + * + * @param containerBlocksMap a map of containerBlocks. + * @throws IOException + */ + @Override + public void addTransactions(Map> containerBlocksMap) + throws IOException { + BatchOperation batch = new BatchOperation(); + lock.lock(); + try { + long currentLatestID = lastTxID; + for (Map.Entry> entry : + containerBlocksMap.entrySet()) { + currentLatestID += 1; + byte[] key = Longs.toByteArray(currentLatestID); + DeletedBlocksTransaction tx = constructNewTransaction(currentLatestID, + entry.getKey(), entry.getValue()); + batch.put(key, tx.toByteArray()); + } + lastTxID = currentLatestID; + batch.put(LATEST_TXID, Longs.toByteArray(lastTxID)); + deletedStore.writeBatch(batch); + } finally { + lock.unlock(); + } + } + @Override public void close() throws IOException { if (deletedStore != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml index 1ec574aa5ff..dcbf82a7ac3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml @@ -431,6 +431,17 @@ + + ozone.key.deleting.limit.per.task + 1000 + + Maximum number of keys to be scanned by key deleting service per + time interval in KSM. Those keys are sent to delete metadata and + generate transactions in SCM for next async deletion between SCM + and DataNode. + + + dfs.container.ipc 50011 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestBlockManager.java index 78daad49c32..483b0c125fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestBlockManager.java @@ -37,6 +37,7 @@ import org.junit.rules.ExpectedException; import java.io.File; import java.io.IOException; import java.nio.file.Paths; +import java.util.Collections; import java.util.UUID; import static org.apache.hadoop.ozone.OzoneConsts.GB; @@ -104,7 +105,7 @@ public class TestBlockManager { public void testDeleteBlock() throws Exception { AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE); Assert.assertNotNull(block); - blockManager.deleteBlock(block.getKey()); + blockManager.deleteBlocks(Collections.singletonList(block.getKey())); // Deleted block can not be retrieved thrown.expectMessage("Specified block key does not exist."); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java index 5e07fa5fdb4..81a89a7be65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java @@ -17,15 +17,31 @@ */ package org.apache.hadoop.ozone.web.client; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.math.RandomUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.helpers.ContainerData; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.helpers.KeyData; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.ksm.KeySpaceManager; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo; +import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs; +import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo; +import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo; import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.Status; import org.apache.hadoop.ozone.web.exceptions.OzoneException; import org.apache.hadoop.ozone.web.utils.OzoneUtils; @@ -48,8 +64,9 @@ import java.io.IOException; import java.net.URISyntaxException; import java.nio.file.Path; import java.nio.file.Paths; -import java.text.ParseException; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.Random; import static org.junit.Assert.assertEquals; @@ -77,6 +94,9 @@ public class TestKeys { public static void init() throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); + // Set short block deleting service interval to speed up deletions. + conf.setInt(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, 1000); + path = GenericTestUtils.getTempPath(TestKeys.class.getSimpleName()); Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG); @@ -104,12 +124,12 @@ public class TestKeys { * * @return File. */ - static File createRandomDataFile(String dir, String fileName, long size) { + static File createRandomDataFile(String dir, String fileName, long size) + throws IOException { File tmpDir = new File(dir); - tmpDir.mkdirs(); + FileUtils.forceMkdir(tmpDir); File tmpFile = new File(tmpDir, fileName); - try { - FileOutputStream randFile = new FileOutputStream(tmpFile); + try (FileOutputStream randFile = new FileOutputStream(tmpFile)) { Random r = new Random(); for (int x = 0; x < size; x++) { char c = (char) (r.nextInt(26) + 'a'); @@ -176,8 +196,7 @@ public class TestKeys { * @return Returns the name of the new key that was created. * @throws OzoneException */ - private String putKey() throws - OzoneException { + private KsmKeyArgs putKey() throws Exception { String volumeName = OzoneUtils.getRequestID().toLowerCase(); client.setUserAuth("hdfs"); @@ -188,16 +207,21 @@ public class TestKeys { bucket = vol.createBucket(bucketName, acls, StorageType.DEFAULT); String fileName = OzoneUtils.getRequestID().toLowerCase(); + file = createRandomDataFile(dir, fileName, 1024); bucket.putKey(keyName, file); - return keyName; + return new KsmKeyArgs.Builder() + .setKeyName(keyName) + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setDataSize(1024) + .build(); } - } @Test - public void testPutKey() throws OzoneException { + public void testPutKey() throws Exception { // Test non-delimited keys runTestPutKey(new PutHelper(ozoneRestClient, path)); // Test key delimited by a random delimiter @@ -206,7 +230,7 @@ public class TestKeys { getMultiPartKey(delimiter))); } - static void runTestPutKey(PutHelper helper) throws OzoneException { + static void runTestPutKey(PutHelper helper) throws Exception { final OzoneRestClient client = helper.client; helper.putKey(); assertNotNull(helper.getBucket()); @@ -254,8 +278,7 @@ public class TestKeys { } @Test - public void testPutAndGetKeyWithDnRestart() - throws OzoneException, IOException, URISyntaxException { + public void testPutAndGetKeyWithDnRestart() throws Exception { runTestPutAndGetKeyWithDnRestart( new PutHelper(ozoneRestClient, path), ozoneCluster); String delimiter = RandomStringUtils.randomAscii(1); @@ -265,9 +288,8 @@ public class TestKeys { } static void runTestPutAndGetKeyWithDnRestart( - PutHelper helper, MiniOzoneCluster cluster) - throws OzoneException, IOException, URISyntaxException { - String keyName = helper.putKey(); + PutHelper helper, MiniOzoneCluster cluster) throws Exception { + String keyName = helper.putKey().getKeyName(); assertNotNull(helper.getBucket()); assertNotNull(helper.getFile()); @@ -281,37 +303,35 @@ public class TestKeys { helper.getBucket().getKey(keyName, newPath); - FileInputStream original = new FileInputStream(helper.getFile()); - FileInputStream downloaded = new FileInputStream(newPath.toFile()); - - - String originalHash = DigestUtils.sha256Hex(original); - String downloadedHash = DigestUtils.sha256Hex(downloaded); - - assertEquals( - "Sha256 does not match between original file and downloaded file.", - originalHash, downloadedHash); + try ( + FileInputStream original = new FileInputStream(helper.getFile()); + FileInputStream downloaded = new FileInputStream(newPath.toFile())) { + String originalHash = DigestUtils.sha256Hex(original); + String downloadedHash = DigestUtils.sha256Hex(downloaded); + assertEquals( + "Sha256 does not match between original file and downloaded file.", + originalHash, downloadedHash); + } } @Test - public void testPutAndGetKey() throws OzoneException, IOException { + public void testPutAndGetKey() throws Exception { runTestPutAndGetKey(new PutHelper(ozoneRestClient, path)); String delimiter = RandomStringUtils.randomAscii(1); runTestPutAndGetKey(new PutHelper(ozoneRestClient, path, getMultiPartKey(delimiter))); } - static void runTestPutAndGetKey(PutHelper helper) - throws OzoneException, IOException { + static void runTestPutAndGetKey(PutHelper helper) throws Exception { final OzoneRestClient client = helper.client; - String keyName = helper.putKey(); + String keyName = helper.putKey().getKeyName(); assertNotNull(helper.getBucket()); assertNotNull(helper.getFile()); - final String newFileName1 = helper.dir + "/" + final String newFileName1 = helper.dir + "/" + OzoneUtils.getRequestID().toLowerCase(); - final String newFileName2 = helper.dir + "/" + final String newFileName2 = helper.dir + "/" + OzoneUtils.getRequestID().toLowerCase(); Path newPath1 = Paths.get(newFileName1); @@ -322,54 +342,51 @@ public class TestKeys { client.getKey(helper.getVol().getVolumeName(), helper.getBucket().getBucketName(), keyName, newPath2); - FileInputStream original = new FileInputStream(helper.getFile()); - FileInputStream downloaded1 = new FileInputStream(newPath1.toFile()); - FileInputStream downloaded2 = new FileInputStream(newPath1.toFile()); + try (FileInputStream original = new FileInputStream(helper.getFile()); + FileInputStream downloaded1 = new FileInputStream(newPath1.toFile()); + FileInputStream downloaded2 = new FileInputStream(newPath1.toFile())) { + String originalHash = DigestUtils.sha256Hex(original); + String downloadedHash1 = DigestUtils.sha256Hex(downloaded1); + String downloadedHash2 = DigestUtils.sha256Hex(downloaded2); - String originalHash = DigestUtils.sha256Hex(original); - String downloadedHash1 = DigestUtils.sha256Hex(downloaded1); - String downloadedHash2 = DigestUtils.sha256Hex(downloaded2); + assertEquals( + "Sha256 does not match between original file and downloaded file.", + originalHash, downloadedHash1); + assertEquals( + "Sha256 does not match between original file and downloaded file.", + originalHash, downloadedHash2); - assertEquals( - "Sha256 does not match between original file and downloaded file.", - originalHash, downloadedHash1); - assertEquals( - "Sha256 does not match between original file and downloaded file.", - originalHash, downloadedHash2); + // test new get key with invalid volume/bucket name + try { + client.getKey("invalid-volume", helper.getBucket().getBucketName(), + keyName, newPath1); + fail("Get key should have thrown " + "when using invalid volume name."); + } catch (OzoneException e) { + GenericTestUtils + .assertExceptionContains(Status.KEY_NOT_FOUND.toString(), e); + } - // test new get key with invalid volume/bucket name - try { - client.getKey("invalid-volume", - helper.getBucket().getBucketName(), keyName, newPath1); - fail("Get key should have thrown " - + "when using invalid volume name."); - } catch (OzoneException e) { - GenericTestUtils.assertExceptionContains( - Status.KEY_NOT_FOUND.toString(), e); - } - - try { - client.getKey(helper.getVol().getVolumeName(), - "invalid-bucket", keyName, newPath1); - fail("Get key should have thrown " - + "when using invalid bucket name."); - } catch (OzoneException e) { - GenericTestUtils.assertExceptionContains( - Status.KEY_NOT_FOUND.toString(), e); + try { + client.getKey(helper.getVol().getVolumeName(), "invalid-bucket", + keyName, newPath1); + fail("Get key should have thrown " + "when using invalid bucket name."); + } catch (OzoneException e) { + GenericTestUtils.assertExceptionContains( + Status.KEY_NOT_FOUND.toString(), e); + } } } @Test - public void testPutAndDeleteKey() throws OzoneException, IOException { + public void testPutAndDeleteKey() throws Exception { runTestPutAndDeleteKey(new PutHelper(ozoneRestClient, path)); String delimiter = RandomStringUtils.randomAscii(1); runTestPutAndDeleteKey(new PutHelper(ozoneRestClient, path, getMultiPartKey(delimiter))); } - static void runTestPutAndDeleteKey(PutHelper helper) - throws OzoneException, IOException { - String keyName = helper.putKey(); + static void runTestPutAndDeleteKey(PutHelper helper) throws Exception { + String keyName = helper.putKey().getKeyName(); assertNotNull(helper.getBucket()); assertNotNull(helper.getFile()); helper.getBucket().deleteKey(keyName); @@ -384,16 +401,14 @@ public class TestKeys { } @Test - public void testPutAndListKey() - throws OzoneException, IOException, ParseException { + public void testPutAndListKey() throws Exception { runTestPutAndListKey(new PutHelper(ozoneRestClient, path)); String delimiter = RandomStringUtils.randomAscii(1); runTestPutAndListKey(new PutHelper(ozoneRestClient, path, getMultiPartKey(delimiter))); } - static void runTestPutAndListKey(PutHelper helper) - throws OzoneException, IOException, ParseException { + static void runTestPutAndListKey(PutHelper helper) throws Exception { final OzoneRestClient client = helper.client; helper.putKey(); assertNotNull(helper.getBucket()); @@ -473,17 +488,15 @@ public class TestKeys { } @Test - public void testGetKeyInfo() - throws OzoneException, IOException, ParseException { + public void testGetKeyInfo() throws Exception { runTestGetKeyInfo(new PutHelper(ozoneRestClient, path)); String delimiter = RandomStringUtils.randomAscii(1); runTestGetKeyInfo(new PutHelper(ozoneRestClient, path, getMultiPartKey(delimiter))); } - static void runTestGetKeyInfo(PutHelper helper) - throws OzoneException, ParseException { - String keyName = helper.putKey(); + static void runTestGetKeyInfo(PutHelper helper) throws Exception { + String keyName = helper.putKey().getKeyName(); assertNotNull(helper.getBucket()); assertNotNull(helper.getFile()); @@ -500,4 +513,170 @@ public class TestKeys { (OzoneUtils.formatDate(keyInfo.getObjectInfo().getModifiedOn()) / 1000) >= (currentTime / 1000)); } + + // Volume, bucket, keys info that helps for test create/delete keys. + private static class BucketKeys { + + private Map, List> buckets; + + BucketKeys() { + buckets = Maps.newHashMap(); + } + + void addKey(String volume, String bucket, String key) { + // check if this bucket exists + for (Map.Entry, List> entry : + buckets.entrySet()) { + if (entry.getKey().getValue().equals(bucket)) { + entry.getValue().add(key); + return; + } + } + + // bucket not exist + Pair newBucket = new ImmutablePair(volume, bucket); + List keyList = Lists.newArrayList(); + keyList.add(key); + buckets.put(newBucket, keyList); + } + + Set> getAllBuckets() { + return buckets.keySet(); + } + + List getBucketKeys(String bucketName) { + for (Map.Entry, List> entry : buckets + .entrySet()) { + if (entry.getKey().getValue().equals(bucketName)) { + return entry.getValue(); + } + } + return Lists.newArrayList(); + } + + int totalNumOfKeys() { + int count = 0; + for (Map.Entry, List> entry : buckets + .entrySet()) { + count += entry.getValue().size(); + } + return count; + } + } + + private int countKsmKeys(KeySpaceManager ksm) throws IOException { + int totalCount = 0; + List volumes = + ksm.listAllVolumes(null, null, Integer.MAX_VALUE); + for (KsmVolumeArgs volume : volumes) { + List buckets = + ksm.listBuckets(volume.getVolume(), null, null, Integer.MAX_VALUE); + for (KsmBucketInfo bucket : buckets) { + List keys = ksm.listKeys(bucket.getVolumeName(), + bucket.getBucketName(), null, null, Integer.MAX_VALUE); + totalCount += keys.size(); + } + } + return totalCount; + } + + @Test + public void testDeleteKey() throws Exception { + KeySpaceManager ksm = ozoneCluster.getKeySpaceManager(); + // To avoid interference from other test cases, + // we collect number of existing keys at the beginning + int numOfExistedKeys = countKsmKeys(ksm); + + // Keep tracking bucket keys info while creating them + PutHelper helper = new PutHelper(ozoneRestClient, path); + BucketKeys bucketKeys = new BucketKeys(); + for (int i = 0; i < 20; i++) { + KsmKeyArgs keyArgs = helper.putKey(); + bucketKeys.addKey(keyArgs.getVolumeName(), keyArgs.getBucketName(), + keyArgs.getKeyName()); + } + + // There should be 20 keys in the buckets we just created. + Assert.assertEquals(20, bucketKeys.totalNumOfKeys()); + + int numOfCreatedKeys = 0; + OzoneContainer cm = ozoneCluster.getDataNodes().get(0) + .getOzoneContainerManager(); + + // Expected to delete chunk file list. + List expectedChunkFiles = Lists.newArrayList(); + // Iterate over all buckets, and list all keys in each bucket, + // count the total number of created keys. + Set> buckets = bucketKeys.getAllBuckets(); + for (Pair buk : buckets) { + List createdKeys = + ksm.listKeys(buk.getKey(), buk.getValue(), null, null, 20); + + // Memorize chunks that has been created, + // so we can verify actual deletions at DN side later. + for (KsmKeyInfo keyInfo : createdKeys) { + List locations = keyInfo.getKeyLocationList(); + for (KsmKeyLocationInfo location : locations) { + String containerName = location.getContainerName(); + KeyData keyData = new KeyData(containerName, location.getBlockID()); + KeyData blockInfo = cm.getContainerManager() + .getKeyManager().getKey(keyData); + ContainerData containerData = cm.getContainerManager() + .readContainer(containerName); + File dataDir = ContainerUtils + .getDataDirectory(containerData).toFile(); + for (ContainerProtos.ChunkInfo chunkInfo : blockInfo.getChunks()) { + File chunkFile = dataDir.toPath() + .resolve(chunkInfo.getChunkName()).toFile(); + System.out.println("Chunk File created: " + + chunkFile.getAbsolutePath()); + Assert.assertTrue(chunkFile.exists()); + expectedChunkFiles.add(chunkFile); + } + } + } + numOfCreatedKeys += createdKeys.size(); + } + + // Ensure all keys are created. + Assert.assertEquals(20, numOfCreatedKeys); + + // Ensure all keys are visible from KSM. + // Total number should be numOfCreated + numOfExisted + Assert.assertEquals(20 + numOfExistedKeys, countKsmKeys(ksm)); + + // Delete 10 keys + int delCount = 20; + Set> allBuckets = bucketKeys.getAllBuckets(); + for (Pair bucketInfo : allBuckets) { + List bks = bucketKeys.getBucketKeys(bucketInfo.getValue()); + for (String keyName : bks) { + if (delCount > 0) { + KsmKeyArgs arg = + new KsmKeyArgs.Builder().setVolumeName(bucketInfo.getKey()) + .setBucketName(bucketInfo.getValue()).setKeyName(keyName) + .build(); + ksm.deleteKey(arg); + delCount--; + } + } + } + + // It should be pretty quick that keys are removed from KSM namespace, + // because actual deletion happens in async mode. + GenericTestUtils.waitFor(() -> { + try { + int num = countKsmKeys(ksm); + return num == (numOfExistedKeys); + } catch (IOException e) { + return false; + } + }, 1000, 10000); + + // It might take a while until all blocks are actually deleted, + // verify all chunk files created earlier are removed from disk. + GenericTestUtils.waitFor( + () -> expectedChunkFiles.stream().allMatch(file -> !file.exists()), + 1000, 60000); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java index e50ce7d618a..2c4ac1c6422 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java @@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.web.client; import org.apache.commons.lang.RandomStringUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.RatisTestHelper; -import org.apache.hadoop.ozone.web.exceptions.OzoneException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Ignore; @@ -28,10 +27,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; -import java.io.IOException; -import java.net.URISyntaxException; -import java.text.ParseException; - import static org.apache.hadoop.ozone.web.client.TestKeys.*; /** The same as {@link TestKeys} except that this test is Ratis enabled. */ @@ -59,7 +54,7 @@ public class TestKeysRatis { } @Test - public void testPutKey() throws OzoneException { + public void testPutKey() throws Exception { runTestPutKey(new PutHelper(ozoneRestClient, path)); String delimiter = RandomStringUtils.randomAlphanumeric(1); runTestPutKey(new PutHelper(ozoneRestClient, path, @@ -67,8 +62,7 @@ public class TestKeysRatis { } @Test - public void testPutAndGetKeyWithDnRestart() - throws OzoneException, IOException, URISyntaxException { + public void testPutAndGetKeyWithDnRestart() throws Exception { runTestPutAndGetKeyWithDnRestart( new PutHelper(ozoneRestClient, path), suite.getCluster()); String delimiter = RandomStringUtils.randomAlphanumeric(1); @@ -78,7 +72,7 @@ public class TestKeysRatis { } @Test - public void testPutAndGetKey() throws OzoneException, IOException { + public void testPutAndGetKey() throws Exception { runTestPutAndGetKey(new PutHelper(ozoneRestClient, path)); String delimiter = RandomStringUtils.randomAlphanumeric(1); runTestPutAndGetKey(new PutHelper(ozoneRestClient, path, @@ -86,7 +80,7 @@ public class TestKeysRatis { } @Test - public void testPutAndDeleteKey() throws OzoneException, IOException { + public void testPutAndDeleteKey() throws Exception { runTestPutAndDeleteKey(new PutHelper(ozoneRestClient, path)); String delimiter = RandomStringUtils.randomAlphanumeric(1); runTestPutAndDeleteKey(new PutHelper(ozoneRestClient, path, @@ -94,8 +88,7 @@ public class TestKeysRatis { } @Test - public void testPutAndListKey() - throws OzoneException, IOException, ParseException { + public void testPutAndListKey() throws Exception { runTestPutAndListKey(new PutHelper(ozoneRestClient, path)); String delimiter = RandomStringUtils.randomAlphanumeric(1); runTestPutAndListKey(new PutHelper(ozoneRestClient, path, @@ -103,8 +96,7 @@ public class TestKeysRatis { } @Test - public void testGetKeyInfo() - throws OzoneException, IOException, ParseException { + public void testGetKeyInfo() throws Exception { runTestGetKeyInfo(new PutHelper(ozoneRestClient, path)); String delimiter = RandomStringUtils.randomAlphanumeric(1); runTestGetKeyInfo(new PutHelper(ozoneRestClient, path,