HDFS-12235. Ozone: DeleteKey-3: KSM SCM block deletion message and ACK interactions. Contributed by Weiwei Yang and Yuanbo Liu.
This commit is contained in:
parent
3aa846412d
commit
743be0d7c0
|
@ -0,0 +1,87 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.common;
|
||||
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.ScmBlockLocationProtocolProtos.KeyBlocks;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A group of blocks relations relevant, e.g belong to a certain object key.
|
||||
*/
|
||||
public final class BlockGroup {
|
||||
|
||||
private String groupID;
|
||||
private List<String> blockIDs;
|
||||
private BlockGroup(String groupID, List<String> blockIDs) {
|
||||
this.groupID = groupID;
|
||||
this.blockIDs = blockIDs;
|
||||
}
|
||||
|
||||
public List<String> getBlockIDList() {
|
||||
return blockIDs;
|
||||
}
|
||||
|
||||
public String getGroupID() {
|
||||
return groupID;
|
||||
}
|
||||
|
||||
public KeyBlocks getProto() {
|
||||
return KeyBlocks.newBuilder().setKey(groupID)
|
||||
.addAllBlocks(blockIDs).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses a KeyBlocks proto to a group of blocks.
|
||||
* @param proto KeyBlocks proto.
|
||||
* @return a group of blocks.
|
||||
*/
|
||||
public static BlockGroup getFromProto(KeyBlocks proto) {
|
||||
return BlockGroup.newBuilder().setKeyName(proto.getKey())
|
||||
.addAllBlockIDs(proto.getBlocksList()).build();
|
||||
}
|
||||
|
||||
public static Builder newBuilder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
/**
|
||||
* BlockGroup instance builder.
|
||||
*/
|
||||
public static class Builder {
|
||||
|
||||
private String groupID;
|
||||
private List<String> blockIDs;
|
||||
|
||||
public Builder setKeyName(String blockGroupID) {
|
||||
this.groupID = blockGroupID;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder addAllBlockIDs(List<String> keyBlocks) {
|
||||
this.blockIDs = keyBlocks;
|
||||
return this;
|
||||
}
|
||||
|
||||
public BlockGroup build() {
|
||||
return new BlockGroup(groupID, blockIDs);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.common;
|
||||
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmBlockResult;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
|
||||
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Result to delete a group of blocks.
|
||||
*/
|
||||
public class DeleteBlockGroupResult {
|
||||
private String objectKey;
|
||||
private List<DeleteBlockResult> blockResultList;
|
||||
public DeleteBlockGroupResult(String objectKey,
|
||||
List<DeleteBlockResult> blockResultList) {
|
||||
this.objectKey = objectKey;
|
||||
this.blockResultList = blockResultList;
|
||||
}
|
||||
|
||||
public String getObjectKey() {
|
||||
return objectKey;
|
||||
}
|
||||
|
||||
public List<DeleteBlockResult> getBlockResultList() {
|
||||
return blockResultList;
|
||||
}
|
||||
|
||||
public List<DeleteScmBlockResult> getBlockResultProtoList() {
|
||||
List<DeleteScmBlockResult> resultProtoList =
|
||||
new ArrayList<>(blockResultList.size());
|
||||
for (DeleteBlockResult result : blockResultList) {
|
||||
DeleteScmBlockResult proto = DeleteScmBlockResult.newBuilder()
|
||||
.setKey(result.getKey())
|
||||
.setResult(result.getResult()).build();
|
||||
resultProtoList.add(proto);
|
||||
}
|
||||
return resultProtoList;
|
||||
}
|
||||
|
||||
public static List<DeleteBlockResult> convertBlockResultProto(
|
||||
List<DeleteScmBlockResult> results) {
|
||||
List<DeleteBlockResult> protoResults = new ArrayList<>(results.size());
|
||||
for (DeleteScmBlockResult result : results) {
|
||||
protoResults.add(new DeleteBlockResult(result.getKey(),
|
||||
result.getResult()));
|
||||
}
|
||||
return protoResults;
|
||||
}
|
||||
|
||||
/**
|
||||
* Only if all blocks are successfully deleted, this group is considered
|
||||
* to be successfully executed.
|
||||
*
|
||||
* @return true if all blocks are successfully deleted, false otherwise.
|
||||
*/
|
||||
public boolean isSuccess() {
|
||||
for (DeleteBlockResult result : blockResultList) {
|
||||
if (result.getResult() != Result.success) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return A list of deletion failed block IDs.
|
||||
*/
|
||||
public List<String> getFailedBlocks() {
|
||||
List<String> failedBlocks = blockResultList.stream()
|
||||
.filter(result -> result.getResult() != Result.success)
|
||||
.map(DeleteBlockResult::getKey).collect(Collectors.toList());
|
||||
return failedBlocks;
|
||||
}
|
||||
}
|
|
@ -74,4 +74,8 @@ public final class KSMConfigKeys {
|
|||
"ozone.ksm.group.rights";
|
||||
public static final OzoneAcl.OzoneACLRights OZONE_KSM_GROUP_RIGHTS_DEFAULT =
|
||||
OzoneAcl.OzoneACLRights.READ_WRITE;
|
||||
|
||||
public static final String OZONE_KEY_DELETING_LIMIT_PER_TASK =
|
||||
"ozone.key.deleting.limit.per.task";
|
||||
public static final int OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT = 1000;
|
||||
}
|
||||
|
|
|
@ -22,8 +22,9 @@ import java.io.IOException;
|
|||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
|
||||
|
||||
/**
|
||||
* ScmBlockLocationProtocol is used by an HDFS node to find the set of nodes
|
||||
|
@ -53,13 +54,14 @@ public interface ScmBlockLocationProtocol {
|
|||
AllocatedBlock allocateBlock(long size) throws IOException;
|
||||
|
||||
/**
|
||||
* Delete the set of keys specified.
|
||||
* Delete blocks for a set of object keys.
|
||||
*
|
||||
* @param keys batch of block keys to delete.
|
||||
* @param keyBlocksInfoList Map of object key and its blocks.
|
||||
* @return list of block deletion results.
|
||||
* @throws IOException if there is any failure.
|
||||
*
|
||||
*/
|
||||
List<DeleteBlockResult> deleteBlocks(Set<String> keys) throws IOException;
|
||||
List<DeleteBlockGroupResult> deleteKeyBlocks(
|
||||
List<BlockGroup> keyBlocksInfoList) throws IOException;
|
||||
|
||||
}
|
||||
|
|
|
@ -24,24 +24,24 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
|
||||
.AllocateScmBlockRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
|
||||
.AllocateScmBlockResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
|
||||
.DeleteScmBlocksRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
|
||||
.DeleteScmBlocksResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
|
||||
.DeleteScmBlockResult;
|
||||
.DeleteScmKeyBlocksResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
|
||||
.GetScmBlockLocationsRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
|
||||
.GetScmBlockLocationsResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
|
||||
.ScmLocatedBlockProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
|
||||
.KeyBlocks;
|
||||
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
|
||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
||||
|
||||
|
@ -50,6 +50,7 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* This class is the client-side translator to translate the requests made on
|
||||
|
@ -147,30 +148,32 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
|
|||
/**
|
||||
* Delete the set of keys specified.
|
||||
*
|
||||
* @param keys batch of block keys to delete.
|
||||
* @param keyBlocksInfoList batch of block keys to delete.
|
||||
* @return list of block deletion results.
|
||||
* @throws IOException if there is any failure.
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
public List<DeleteBlockResult> deleteBlocks(Set<String> keys)
|
||||
throws IOException {
|
||||
Preconditions.checkArgument(keys != null && !keys.isEmpty(),
|
||||
"keys to be deleted cannot be null or empty");
|
||||
DeleteScmBlocksRequestProto request = DeleteScmBlocksRequestProto
|
||||
.newBuilder()
|
||||
.addAllKeys(keys)
|
||||
.build();
|
||||
final DeleteScmBlocksResponseProto resp;
|
||||
public List<DeleteBlockGroupResult> deleteKeyBlocks(
|
||||
List<BlockGroup> keyBlocksInfoList) throws IOException {
|
||||
List<KeyBlocks> keyBlocksProto = keyBlocksInfoList.stream()
|
||||
.map(BlockGroup::getProto).collect(Collectors.toList());
|
||||
DeleteScmKeyBlocksRequestProto request = DeleteScmKeyBlocksRequestProto
|
||||
.newBuilder().addAllKeyBlocks(keyBlocksProto).build();
|
||||
|
||||
final DeleteScmKeyBlocksResponseProto resp;
|
||||
try {
|
||||
resp = rpcProxy.deleteScmBlocks(NULL_RPC_CONTROLLER, request);
|
||||
resp = rpcProxy.deleteScmKeyBlocks(NULL_RPC_CONTROLLER, request);
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
List<DeleteBlockResult> results = new ArrayList(resp.getResultsCount());
|
||||
for (DeleteScmBlockResult result : resp.getResultsList()) {
|
||||
results.add(new DeleteBlockResult(result.getKey(), result.getResult()));
|
||||
}
|
||||
List<DeleteBlockGroupResult> results =
|
||||
new ArrayList<>(resp.getResultsCount());
|
||||
results.addAll(resp.getResultsList().stream().map(
|
||||
result -> new DeleteBlockGroupResult(result.getObjectKey(),
|
||||
DeleteBlockGroupResult
|
||||
.convertBlockResultProto(result.getBlockResultsList())))
|
||||
.collect(Collectors.toList()));
|
||||
return results;
|
||||
}
|
||||
|
||||
|
|
|
@ -64,17 +64,40 @@ message AllocateScmBlockRequestProto {
|
|||
}
|
||||
|
||||
/**
|
||||
* keys - batch of block keys to deleted
|
||||
* A delete key request sent by KSM to SCM, it contains
|
||||
* multiple number of keys (and their blocks).
|
||||
*/
|
||||
message DeleteScmBlocksRequestProto {
|
||||
repeated string keys = 1;
|
||||
message DeleteScmKeyBlocksRequestProto {
|
||||
repeated KeyBlocks keyBlocks = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* deletedKeys - keys that are deleted successfully
|
||||
* A object key and all its associated blocks.
|
||||
* We need to encapsulate object key name plus the blocks in this potocol
|
||||
* because SCM needs to response KSM with the keys it has deleted.
|
||||
* If the response only contains blocks, it will be very expensive for
|
||||
* KSM to figure out what keys have been deleted.
|
||||
*/
|
||||
message DeleteScmBlocksResponseProto {
|
||||
repeated DeleteScmBlockResult results = 1;
|
||||
message KeyBlocks {
|
||||
required string key = 1;
|
||||
repeated string blocks = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
* A delete key response from SCM to KSM, it contains multiple child-results.
|
||||
* Each child-result represents a key deletion result, only if all blocks of
|
||||
* a key are successfully deleted, this key result is considered as succeed.
|
||||
*/
|
||||
message DeleteScmKeyBlocksResponseProto {
|
||||
repeated DeleteKeyBlocksResultProto results = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* A key deletion result. It contains all the block deletion results.
|
||||
*/
|
||||
message DeleteKeyBlocksResultProto {
|
||||
required string objectKey = 1;
|
||||
repeated DeleteScmBlockResult blockResults = 2;
|
||||
}
|
||||
|
||||
message DeleteScmBlockResult {
|
||||
|
@ -126,8 +149,8 @@ service ScmBlockLocationProtocolService {
|
|||
returns (AllocateScmBlockResponseProto);
|
||||
|
||||
/**
|
||||
* Deletes one or multiple block keys from SCM.
|
||||
* Deletes blocks for a set of object keys from SCM.
|
||||
*/
|
||||
rpc deleteScmBlocks(DeleteScmBlocksRequestProto)
|
||||
returns (DeleteScmBlocksResponseProto);
|
||||
rpc deleteScmKeyBlocks(DeleteScmKeyBlocksRequestProto)
|
||||
returns (DeleteScmKeyBlocksResponseProto);
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.ksm;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
|
||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
|
||||
import org.apache.hadoop.utils.BatchOperation;
|
||||
|
@ -206,4 +207,16 @@ public interface KSMMetadataManager {
|
|||
*/
|
||||
List<KsmVolumeArgs> listVolumes(String userName, String prefix,
|
||||
String startKey, int maxKeys) throws IOException;
|
||||
|
||||
/**
|
||||
* Returns a list of pending deletion key info that ups to the given count.
|
||||
* Each entry is a {@link BlockGroup}, which contains the info about the
|
||||
* key name and all its associated block IDs. A pending deletion key is
|
||||
* stored with #deleting# prefix in KSM DB.
|
||||
*
|
||||
* @param count max number of keys to return.
|
||||
* @return a list of {@link BlockGroup} represent keys and blocks.
|
||||
* @throws IOException
|
||||
*/
|
||||
List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
|
||||
}
|
||||
|
|
|
@ -21,9 +21,8 @@ import com.google.common.base.Strings;
|
|||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.*;
|
||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
||||
|
@ -47,6 +46,7 @@ import java.util.Map;
|
|||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
|
||||
|
@ -440,4 +440,28 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager {
|
|||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<BlockGroup> getPendingDeletionKeys(final int count)
|
||||
throws IOException {
|
||||
List<BlockGroup> keyBlocksList = Lists.newArrayList();
|
||||
final MetadataKeyFilter deletingKeyFilter =
|
||||
new KeyPrefixFilter(DELETING_KEY_PREFIX);
|
||||
List<Map.Entry<byte[], byte[]>> rangeResult =
|
||||
store.getRangeKVs(null, count, deletingKeyFilter);
|
||||
for (Map.Entry<byte[], byte[]> entry : rangeResult) {
|
||||
KsmKeyInfo info =
|
||||
KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
|
||||
// Get block keys as a list.
|
||||
List<String> item = info.getKeyLocationList().stream()
|
||||
.map(KsmKeyLocationInfo::getBlockID)
|
||||
.collect(Collectors.toList());
|
||||
BlockGroup keyBlocks = BlockGroup.newBuilder()
|
||||
.setKeyName(DFSUtil.bytes2String(entry.getKey()))
|
||||
.addAllBlockIDs(item)
|
||||
.build();
|
||||
keyBlocksList.add(keyBlocks);
|
||||
}
|
||||
return keyBlocksList;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,132 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.ksm;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
||||
import org.apache.hadoop.utils.BackgroundService;
|
||||
import org.apache.hadoop.utils.BackgroundTask;
|
||||
import org.apache.hadoop.utils.BackgroundTaskQueue;
|
||||
import org.apache.hadoop.utils.BackgroundTaskResult;
|
||||
import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
|
||||
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
|
||||
|
||||
/**
|
||||
* This is the background service to delete keys.
|
||||
* Scan the metadata of ksm periodically to get
|
||||
* the keys with prefix "#deleting" and ask scm to
|
||||
* delete metadata accordingly, if scm returns
|
||||
* success for keys, then clean up those keys.
|
||||
*/
|
||||
public class KeyDeletingService extends BackgroundService {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(KeyDeletingService.class);
|
||||
|
||||
// The thread pool size for key deleting service.
|
||||
private final static int KEY_DELETING_CORE_POOL_SIZE = 2;
|
||||
|
||||
private final ScmBlockLocationProtocol scmClient;
|
||||
private final KeyManager manager;
|
||||
private final int keyLimitPerTask;
|
||||
|
||||
public KeyDeletingService(ScmBlockLocationProtocol scmClient,
|
||||
KeyManager manager, int serviceInterval,
|
||||
long serviceTimeout, Configuration conf) {
|
||||
super("KeyDeletingService", serviceInterval, TimeUnit.MILLISECONDS,
|
||||
KEY_DELETING_CORE_POOL_SIZE, serviceTimeout);
|
||||
this.scmClient = scmClient;
|
||||
this.manager = manager;
|
||||
this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK,
|
||||
OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BackgroundTaskQueue getTasks() {
|
||||
BackgroundTaskQueue queue = new BackgroundTaskQueue();
|
||||
queue.add(new KeyDeletingTask());
|
||||
return queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* A key deleting task scans KSM DB and looking for a certain number
|
||||
* of pending-deletion keys, sends these keys along with their associated
|
||||
* blocks to SCM for deletion. Once SCM confirms keys are deleted (once
|
||||
* SCM persisted the blocks info in its deletedBlockLog), it removes
|
||||
* these keys from the DB.
|
||||
*/
|
||||
private class KeyDeletingTask implements
|
||||
BackgroundTask<BackgroundTaskResult> {
|
||||
|
||||
@Override
|
||||
public int getPriority() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BackgroundTaskResult call() throws Exception {
|
||||
try {
|
||||
List<BlockGroup> keyBlocksList = manager
|
||||
.getPendingDeletionKeys(keyLimitPerTask);
|
||||
if (keyBlocksList.size() > 0) {
|
||||
LOG.info("Found {} to-delete keys in KSM", keyBlocksList.size());
|
||||
List<DeleteBlockGroupResult> results =
|
||||
scmClient.deleteKeyBlocks(keyBlocksList);
|
||||
for (DeleteBlockGroupResult result : results) {
|
||||
if (result.isSuccess()) {
|
||||
try {
|
||||
// Purge key from KSM DB.
|
||||
manager.deletePendingDeletionKey(result.getObjectKey());
|
||||
LOG.info("Key {} deleted from KSM DB", result.getObjectKey());
|
||||
} catch (IOException e) {
|
||||
// if a pending deletion key is failed to delete,
|
||||
// print a warning here and retain it in this state,
|
||||
// so that it can be attempt to delete next time.
|
||||
LOG.warn("Failed to delete pending-deletion key {}",
|
||||
result.getObjectKey(), e);
|
||||
}
|
||||
} else {
|
||||
// Key deletion failed, retry in next interval.
|
||||
LOG.warn("Key {} deletion failed because some of the blocks"
|
||||
+ " were failed to delete, failed blocks: {}",
|
||||
result.getObjectKey(),
|
||||
String.join(",", result.getFailedBlocks()));
|
||||
}
|
||||
}
|
||||
return results::size;
|
||||
} else {
|
||||
LOG.info("No pending deletion key found in KSM");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to get pending deletion keys, retry in"
|
||||
+ " next interval", e);
|
||||
}
|
||||
return EmptyTaskResult.newResult();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.ozone.ksm;
|
||||
|
||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
|
||||
|
||||
|
@ -26,6 +27,17 @@ import java.util.List;
|
|||
* Handles key level commands.
|
||||
*/
|
||||
public interface KeyManager {
|
||||
|
||||
/**
|
||||
* Start key manager.
|
||||
*/
|
||||
void start();
|
||||
|
||||
/**
|
||||
* Stop key manager.
|
||||
*/
|
||||
void stop() throws IOException;
|
||||
|
||||
/**
|
||||
* Given the args of a key to put, return a pipeline for the key. Writes
|
||||
* the key to pipeline mapping to meta data.
|
||||
|
@ -89,4 +101,26 @@ public interface KeyManager {
|
|||
List<KsmKeyInfo> listKeys(String volumeName,
|
||||
String bucketName, String startKey, String keyPrefix, int maxKeys)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Returns a list of pending deletion key info that ups to the given count.
|
||||
* Each entry is a {@link BlockGroup}, which contains the info about the
|
||||
* key name and all its associated block IDs. A pending deletion key is
|
||||
* stored with #deleting# prefix in KSM DB.
|
||||
*
|
||||
* @param count max number of keys to return.
|
||||
* @return a list of {@link BlockGroup} representing keys and blocks.
|
||||
* @throws IOException
|
||||
*/
|
||||
List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
|
||||
|
||||
/**
|
||||
* Deletes a pending deletion key by its name. This is often called when
|
||||
* key can be safely deleted from this layer. Once called, all footprints
|
||||
* of the key will be purged from KSM DB.
|
||||
*
|
||||
* @param objectKeyName object key name with #deleting# prefix.
|
||||
* @throws IOException if specified key doesn't exist or other I/O errors.
|
||||
*/
|
||||
void deletePendingDeletionKey(String objectKeyName) throws IOException;
|
||||
}
|
||||
|
|
|
@ -17,7 +17,10 @@
|
|||
package org.apache.hadoop.ozone.ksm;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
|
||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
|
@ -27,6 +30,7 @@ import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyI
|
|||
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.utils.BackgroundService;
|
||||
import org.apache.hadoop.utils.BatchOperation;
|
||||
import org.iq80.leveldb.DBException;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -35,7 +39,12 @@ import org.slf4j.LoggerFactory;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_KEY;
|
||||
|
||||
|
@ -52,6 +61,7 @@ public class KeyManagerImpl implements KeyManager {
|
|||
private final ScmBlockLocationProtocol scmBlockClient;
|
||||
private final KSMMetadataManager metadataManager;
|
||||
private final long scmBlockSize;
|
||||
private final BackgroundService keyDeletingService;
|
||||
|
||||
public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
|
||||
KSMMetadataManager metadataManager, OzoneConfiguration conf) {
|
||||
|
@ -59,6 +69,24 @@ public class KeyManagerImpl implements KeyManager {
|
|||
this.metadataManager = metadataManager;
|
||||
this.scmBlockSize = conf.getLong(OZONE_SCM_BLOCK_SIZE_KEY,
|
||||
OZONE_SCM_BLOCK_SIZE_DEFAULT);
|
||||
int svcInterval = conf.getInt(
|
||||
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS,
|
||||
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT);
|
||||
long serviceTimeout = conf.getTimeDuration(
|
||||
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
|
||||
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
|
||||
keyDeletingService = new KeyDeletingService(
|
||||
scmBlockClient, this, svcInterval, serviceTimeout, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
keyDeletingService.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws IOException {
|
||||
keyDeletingService.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -181,7 +209,6 @@ public class KeyManagerImpl implements KeyManager {
|
|||
@Override
|
||||
public void deleteKey(KsmKeyArgs args) throws IOException {
|
||||
Preconditions.checkNotNull(args);
|
||||
|
||||
metadataManager.writeLock().lock();
|
||||
String volumeName = args.getVolumeName();
|
||||
String bucketName = args.getBucketName();
|
||||
|
@ -223,4 +250,39 @@ public class KeyManagerImpl implements KeyManager {
|
|||
metadataManager.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<BlockGroup> getPendingDeletionKeys(final int count)
|
||||
throws IOException {
|
||||
metadataManager.readLock().lock();
|
||||
try {
|
||||
return metadataManager.getPendingDeletionKeys(count);
|
||||
} finally {
|
||||
metadataManager.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deletePendingDeletionKey(String objectKeyName)
|
||||
throws IOException{
|
||||
Preconditions.checkNotNull(objectKeyName);
|
||||
if (!objectKeyName.startsWith(OzoneConsts.DELETING_KEY_PREFIX)) {
|
||||
throw new IllegalArgumentException("Invalid key name,"
|
||||
+ " the name should be the key name with deleting prefix");
|
||||
}
|
||||
|
||||
// Simply removes the entry from KSM DB.
|
||||
metadataManager.writeLock().lock();
|
||||
try {
|
||||
byte[] pendingDelKey = DFSUtil.string2Bytes(objectKeyName);
|
||||
byte[] delKeyValue = metadataManager.get(pendingDelKey);
|
||||
if (delKeyValue == null) {
|
||||
throw new IOException("Failed to delete key " + objectKeyName
|
||||
+ " because it is not found in DB");
|
||||
}
|
||||
metadataManager.delete(pendingDelKey);
|
||||
} finally {
|
||||
metadataManager.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -217,6 +217,7 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
|
|||
ksmRpcAddress));
|
||||
DefaultMetricsSystem.initialize("KeySpaceManager");
|
||||
metadataManager.start();
|
||||
keyManager.start();
|
||||
ksmRpcServer.start();
|
||||
httpServer.start();
|
||||
registerMXBean();
|
||||
|
@ -228,8 +229,9 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
|
|||
*/
|
||||
public void stop() {
|
||||
try {
|
||||
ksmRpcServer.stop();
|
||||
metadataManager.stop();
|
||||
ksmRpcServer.stop();
|
||||
keyManager.stop();
|
||||
httpServer.stop();
|
||||
metrics.unRegister();
|
||||
unregisterMXBean();
|
||||
|
|
|
@ -21,8 +21,10 @@ import com.google.common.collect.Sets;
|
|||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteKeyBlocksResultProto;
|
||||
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
|
||||
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
||||
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
||||
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
|
||||
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
|
||||
|
@ -32,11 +34,9 @@ import org.apache.hadoop.ozone.protocol.proto
|
|||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.ScmBlockLocationProtocolProtos.DeleteScmBlocksRequestProto;
|
||||
.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.ScmBlockLocationProtocolProtos.DeleteScmBlocksResponseProto;
|
||||
import static org.apache.hadoop.ozone.protocol.proto
|
||||
.ScmBlockLocationProtocolProtos.DeleteScmBlockResult;
|
||||
.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.ScmBlockLocationProtocolProtos.GetScmBlockLocationsRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
|
@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.protocol.proto
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* This class is the server-side translator that forwards requests received on
|
||||
|
@ -123,21 +124,22 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
|
|||
}
|
||||
|
||||
@Override
|
||||
public DeleteScmBlocksResponseProto deleteScmBlocks(
|
||||
RpcController controller, DeleteScmBlocksRequestProto req)
|
||||
public DeleteScmKeyBlocksResponseProto deleteScmKeyBlocks(
|
||||
RpcController controller, DeleteScmKeyBlocksRequestProto req)
|
||||
throws ServiceException {
|
||||
Set<String> keys = Sets.newLinkedHashSetWithExpectedSize(
|
||||
req.getKeysCount());
|
||||
for (String key : req.getKeysList()) {
|
||||
keys.add(key);
|
||||
}
|
||||
DeleteScmBlocksResponseProto.Builder resp =
|
||||
DeleteScmBlocksResponseProto.newBuilder();
|
||||
DeleteScmKeyBlocksResponseProto.Builder resp =
|
||||
DeleteScmKeyBlocksResponseProto.newBuilder();
|
||||
try {
|
||||
final List<DeleteBlockResult> results = impl.deleteBlocks(keys);
|
||||
for (DeleteBlockResult result: results) {
|
||||
DeleteScmBlockResult.Builder deleteResult = DeleteScmBlockResult
|
||||
.newBuilder().setKey(result.getKey()).setResult(result.getResult());
|
||||
List<BlockGroup> infoList = req.getKeyBlocksList().stream()
|
||||
.map(BlockGroup::getFromProto).collect(Collectors.toList());
|
||||
final List<DeleteBlockGroupResult> results =
|
||||
impl.deleteKeyBlocks(infoList);
|
||||
for (DeleteBlockGroupResult result: results) {
|
||||
DeleteKeyBlocksResultProto.Builder deleteResult =
|
||||
DeleteKeyBlocksResultProto
|
||||
.newBuilder()
|
||||
.setObjectKey(result.getObjectKey())
|
||||
.addAllBlockResults(result.getBlockResultProtoList());
|
||||
resp.addResults(deleteResult.build());
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
|
|
|
@ -33,6 +33,8 @@ import org.apache.hadoop.metrics2.util.MBeans;
|
|||
import org.apache.hadoop.ozone.client.OzoneClientUtils;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
|
||||
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
|
||||
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
|
||||
|
@ -85,6 +87,7 @@ import org.slf4j.LoggerFactory;
|
|||
import javax.management.ObjectName;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
|
@ -838,19 +841,26 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
}
|
||||
|
||||
/**
|
||||
* Delete blocks.
|
||||
* @param keys batch of block keys to delete.
|
||||
* Delete blocks for a set of object keys.
|
||||
*
|
||||
* @param keyBlocksInfoList list of block keys with object keys to delete.
|
||||
* @return deletion results.
|
||||
*/
|
||||
public List<DeleteBlockResult> deleteBlocks(final Set<String> keys) {
|
||||
List<DeleteBlockResult> results = new LinkedList<>();
|
||||
for (String key: keys) {
|
||||
public List<DeleteBlockGroupResult> deleteKeyBlocks(
|
||||
List<BlockGroup> keyBlocksInfoList) throws IOException {
|
||||
LOG.info("SCM is informed by KSM to delete {} blocks",
|
||||
keyBlocksInfoList.size());
|
||||
List<DeleteBlockGroupResult> results = new ArrayList<>();
|
||||
for (BlockGroup keyBlocks : keyBlocksInfoList) {
|
||||
Result resultCode;
|
||||
try {
|
||||
scmBlockManager.deleteBlock(key);
|
||||
// We delete blocks in an atomic operation to prevent getting
|
||||
// into state like only a partial of blocks are deleted,
|
||||
// which will leave key in an inconsistent state.
|
||||
scmBlockManager.deleteBlocks(keyBlocks.getBlockIDList());
|
||||
resultCode = Result.success;
|
||||
} catch (SCMException scmEx) {
|
||||
LOG.warn("Fail to delete block: {}", key, scmEx);
|
||||
LOG.warn("Fail to delete block: {}", keyBlocks.getGroupID(), scmEx);
|
||||
switch (scmEx.getResult()) {
|
||||
case CHILL_MODE_EXCEPTION:
|
||||
resultCode = Result.chillMode;
|
||||
|
@ -862,10 +872,16 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
resultCode = Result.unknownFailure;
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LOG.warn("Fail to delete block: {}", key, ex);
|
||||
LOG.warn("Fail to delete blocks for object key: {}",
|
||||
keyBlocks.getGroupID(), ex);
|
||||
resultCode = Result.unknownFailure;
|
||||
}
|
||||
results.add(new DeleteBlockResult(key, resultCode));
|
||||
List<DeleteBlockResult> blockResultList = new ArrayList<>();
|
||||
for (String blockKey : keyBlocks.getBlockIDList()) {
|
||||
blockResultList.add(new DeleteBlockResult(blockKey, resultCode));
|
||||
}
|
||||
results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(),
|
||||
blockResultList));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -46,11 +47,16 @@ public interface BlockManager extends Closeable {
|
|||
Pipeline getBlock(String key) throws IOException;
|
||||
|
||||
/**
|
||||
* Given a key of the block, delete the block.
|
||||
* @param key - key of the block.
|
||||
* @throws IOException
|
||||
* Deletes a list of blocks in an atomic operation. Internally, SCM
|
||||
* writes these blocks into a {@link DeletedBlockLog} and deletes them
|
||||
* from SCM DB. If this is successful, given blocks are entering pending
|
||||
* deletion state and becomes invisible from SCM namespace.
|
||||
*
|
||||
* @param blockIDs block IDs. This is often the list of blocks of
|
||||
* a particular object key.
|
||||
* @throws IOException if exception happens, non of the blocks is deleted.
|
||||
*/
|
||||
void deleteBlock(String key) throws IOException;
|
||||
void deleteBlocks(List<String> blockIDs) throws IOException;
|
||||
|
||||
/**
|
||||
* @return the block deletion transaction log maintained by SCM.
|
||||
|
|
|
@ -45,6 +45,7 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
|
@ -382,8 +383,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|||
}
|
||||
// now we should have some candidates in ALLOCATE state
|
||||
if (candidates.size() == 0) {
|
||||
throw new SCMException("Fail to find any container to allocate block " +
|
||||
"of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE);
|
||||
throw new SCMException("Fail to find any container to allocate block "
|
||||
+ "of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -475,35 +476,84 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|||
}
|
||||
|
||||
/**
|
||||
* Given a block key, delete a block.
|
||||
* @param key - block key assigned by SCM.
|
||||
* @throws IOException
|
||||
* Deletes a list of blocks in an atomic operation. Internally, SCM
|
||||
* writes these blocks into a {@link DeletedBlockLog} and deletes them
|
||||
* from SCM DB. If this is successful, given blocks are entering pending
|
||||
* deletion state and becomes invisible from SCM namespace.
|
||||
*
|
||||
* @param blockIDs block IDs. This is often the list of blocks of
|
||||
* a particular object key.
|
||||
* @throws IOException if exception happens, non of the blocks is deleted.
|
||||
*/
|
||||
@Override
|
||||
public void deleteBlock(final String key) throws IOException {
|
||||
public void deleteBlocks(List<String> blockIDs) throws IOException {
|
||||
if (!nodeManager.isOutOfNodeChillMode()) {
|
||||
throw new SCMException("Unable to delete block while in chill mode",
|
||||
CHILL_MODE_EXCEPTION);
|
||||
}
|
||||
|
||||
lock.lock();
|
||||
LOG.info("Deleting blocks {}", String.join(",", blockIDs));
|
||||
Map<String, List<String>> containerBlocks = new HashMap<>();
|
||||
BatchOperation batch = new BatchOperation();
|
||||
BatchOperation rollbackBatch = new BatchOperation();
|
||||
// TODO: track the block size info so that we can reclaim the container
|
||||
// TODO: used space when the block is deleted.
|
||||
try {
|
||||
byte[] containerBytes = blockStore.get(DFSUtil.string2Bytes(key));
|
||||
if (containerBytes == null) {
|
||||
throw new SCMException("Specified block key does not exist. key : " +
|
||||
key, FAILED_TO_FIND_BLOCK);
|
||||
for (String blockKey : blockIDs) {
|
||||
byte[] blockKeyBytes = DFSUtil.string2Bytes(blockKey);
|
||||
byte[] containerBytes = blockStore.get(blockKeyBytes);
|
||||
if (containerBytes == null) {
|
||||
throw new SCMException(
|
||||
"Specified block key does not exist. key : " + blockKey,
|
||||
FAILED_TO_FIND_BLOCK);
|
||||
}
|
||||
batch.delete(blockKeyBytes);
|
||||
rollbackBatch.put(blockKeyBytes, containerBytes);
|
||||
|
||||
// Merge blocks to a container to blocks mapping,
|
||||
// prepare to persist this info to the deletedBlocksLog.
|
||||
String containerName = DFSUtil.bytes2String(containerBytes);
|
||||
if (containerBlocks.containsKey(containerName)) {
|
||||
containerBlocks.get(containerName).add(blockKey);
|
||||
} else {
|
||||
List<String> item = new ArrayList<>();
|
||||
item.add(blockKey);
|
||||
containerBlocks.put(containerName, item);
|
||||
}
|
||||
}
|
||||
// TODO: track the block size info so that we can reclaim the container
|
||||
// TODO: used space when the block is deleted.
|
||||
BatchOperation batch = new BatchOperation();
|
||||
String deletedKeyName = getDeletedKeyName(key);
|
||||
// Add a tombstone for the deleted key
|
||||
batch.put(DFSUtil.string2Bytes(deletedKeyName), containerBytes);
|
||||
// Delete the block key
|
||||
batch.delete(DFSUtil.string2Bytes(key));
|
||||
|
||||
// We update SCM DB first, so if this step fails, we end up here,
|
||||
// nothing gets into the delLog so no blocks will be accidentally
|
||||
// removed. If we write the log first, once log is written, the
|
||||
// async deleting service will start to scan and might be picking
|
||||
// up some blocks to do real deletions, that might cause data loss.
|
||||
blockStore.writeBatch(batch);
|
||||
// TODO: Add async tombstone clean thread to send delete command to
|
||||
// datanodes in the pipeline to clean up the blocks from containers.
|
||||
try {
|
||||
deletedBlockLog.addTransactions(containerBlocks);
|
||||
} catch (IOException e) {
|
||||
try {
|
||||
// If delLog update is failed, we need to rollback the changes.
|
||||
blockStore.writeBatch(rollbackBatch);
|
||||
} catch (IOException rollbackException) {
|
||||
// This is a corner case. AddTX fails and rollback also fails,
|
||||
// this will leave these blocks in inconsistent state. They were
|
||||
// moved to pending deletion state in SCM DB but were not written
|
||||
// into delLog so real deletions would not be done. Blocks become
|
||||
// to be invisible from namespace but actual data are not removed.
|
||||
// We log an error here so admin can manually check and fix such
|
||||
// errors.
|
||||
LOG.error("Blocks might be in inconsistent state because"
|
||||
+ " they were moved to pending deletion state in SCM DB but"
|
||||
+ " not written into delLog. Admin can manually add them"
|
||||
+ " into delLog for deletions. Inconsistent block list: {}",
|
||||
String.join(",", blockIDs), e);
|
||||
throw rollbackException;
|
||||
}
|
||||
throw new IOException("Skip writing the deleted blocks info to"
|
||||
+ " the delLog because addTransaction fails. Batch skipped: "
|
||||
+ String.join(",", blockIDs), e);
|
||||
}
|
||||
// TODO: Container report handling of the deleted blocks:
|
||||
// Remove tombstone and update open container usage.
|
||||
// We will revisit this when the closed container replication is done.
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.ozone.protocol.proto
|
|||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The DeletedBlockLog is a persisted log in SCM to keep tracking
|
||||
|
@ -86,6 +87,22 @@ public interface DeletedBlockLog extends Closeable {
|
|||
void addTransaction(String containerName, List<String> blocks)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Creates block deletion transactions for a set of containers,
|
||||
* add into the log and persist them atomically. An object key
|
||||
* might be stored in multiple containers and multiple blocks,
|
||||
* this API ensures that these updates are done in atomic manner
|
||||
* so if any of them fails, the entire operation fails without
|
||||
* any updates to the log. Note, this doesn't mean to create only
|
||||
* one transaction, it creates multiple transactions (depends on the
|
||||
* number of containers) together (on success) or non (on failure).
|
||||
*
|
||||
* @param containerBlocksMap a map of containerBlocks.
|
||||
* @throws IOException
|
||||
*/
|
||||
void addTransactions(Map<String, List<String>> containerBlocksMap)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Returns the total number of valid transactions. A transaction is
|
||||
* considered to be valid as long as its count is in range [0, MAX_RETRY].
|
||||
|
|
|
@ -37,6 +37,7 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
@ -209,6 +210,16 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
|
|||
}
|
||||
}
|
||||
|
||||
private DeletedBlocksTransaction constructNewTransaction(long txID,
|
||||
String containerName, List<String> blocks) {
|
||||
return DeletedBlocksTransaction.newBuilder()
|
||||
.setTxID(txID)
|
||||
.setContainerName(containerName)
|
||||
.addAllBlockID(blocks)
|
||||
.setCount(0)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*
|
||||
|
@ -244,12 +255,8 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
|
|||
BatchOperation batch = new BatchOperation();
|
||||
lock.lock();
|
||||
try {
|
||||
DeletedBlocksTransaction tx = DeletedBlocksTransaction.newBuilder()
|
||||
.setTxID(lastTxID + 1)
|
||||
.setContainerName(containerName)
|
||||
.addAllBlockID(blocks)
|
||||
.setCount(0)
|
||||
.build();
|
||||
DeletedBlocksTransaction tx = constructNewTransaction(lastTxID + 1,
|
||||
containerName, blocks);
|
||||
byte[] key = Longs.toByteArray(lastTxID + 1);
|
||||
|
||||
batch.put(key, tx.toByteArray());
|
||||
|
@ -284,6 +291,35 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*
|
||||
* @param containerBlocksMap a map of containerBlocks.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void addTransactions(Map<String, List<String>> containerBlocksMap)
|
||||
throws IOException {
|
||||
BatchOperation batch = new BatchOperation();
|
||||
lock.lock();
|
||||
try {
|
||||
long currentLatestID = lastTxID;
|
||||
for (Map.Entry<String, List<String>> entry :
|
||||
containerBlocksMap.entrySet()) {
|
||||
currentLatestID += 1;
|
||||
byte[] key = Longs.toByteArray(currentLatestID);
|
||||
DeletedBlocksTransaction tx = constructNewTransaction(currentLatestID,
|
||||
entry.getKey(), entry.getValue());
|
||||
batch.put(key, tx.toByteArray());
|
||||
}
|
||||
lastTxID = currentLatestID;
|
||||
batch.put(LATEST_TXID, Longs.toByteArray(lastTxID));
|
||||
deletedStore.writeBatch(batch);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (deletedStore != null) {
|
||||
|
|
|
@ -431,6 +431,17 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>ozone.key.deleting.limit.per.task</name>
|
||||
<value>1000</value>
|
||||
<description>
|
||||
Maximum number of keys to be scanned by key deleting service per
|
||||
time interval in KSM. Those keys are sent to delete metadata and
|
||||
generate transactions in SCM for next async deletion between SCM
|
||||
and DataNode.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.container.ipc</name>
|
||||
<value>50011</value>
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.junit.rules.ExpectedException;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Collections;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.GB;
|
||||
|
@ -104,7 +105,7 @@ public class TestBlockManager {
|
|||
public void testDeleteBlock() throws Exception {
|
||||
AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE);
|
||||
Assert.assertNotNull(block);
|
||||
blockManager.deleteBlock(block.getKey());
|
||||
blockManager.deleteBlocks(Collections.singletonList(block.getKey()));
|
||||
|
||||
// Deleted block can not be retrieved
|
||||
thrown.expectMessage("Specified block key does not exist.");
|
||||
|
|
|
@ -17,15 +17,31 @@
|
|||
*/
|
||||
package org.apache.hadoop.ozone.web.client;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang.RandomStringUtils;
|
||||
import org.apache.commons.lang.math.RandomUtils;
|
||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
||||
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||
import org.apache.hadoop.ozone.ksm.KeySpaceManager;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
|
||||
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.Status;
|
||||
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||
|
@ -48,8 +64,9 @@ import java.io.IOException;
|
|||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.text.ParseException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Random;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -77,6 +94,9 @@ public class TestKeys {
|
|||
public static void init() throws Exception {
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
|
||||
// Set short block deleting service interval to speed up deletions.
|
||||
conf.setInt(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, 1000);
|
||||
|
||||
path = GenericTestUtils.getTempPath(TestKeys.class.getSimpleName());
|
||||
Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);
|
||||
|
||||
|
@ -104,12 +124,12 @@ public class TestKeys {
|
|||
*
|
||||
* @return File.
|
||||
*/
|
||||
static File createRandomDataFile(String dir, String fileName, long size) {
|
||||
static File createRandomDataFile(String dir, String fileName, long size)
|
||||
throws IOException {
|
||||
File tmpDir = new File(dir);
|
||||
tmpDir.mkdirs();
|
||||
FileUtils.forceMkdir(tmpDir);
|
||||
File tmpFile = new File(tmpDir, fileName);
|
||||
try {
|
||||
FileOutputStream randFile = new FileOutputStream(tmpFile);
|
||||
try (FileOutputStream randFile = new FileOutputStream(tmpFile)) {
|
||||
Random r = new Random();
|
||||
for (int x = 0; x < size; x++) {
|
||||
char c = (char) (r.nextInt(26) + 'a');
|
||||
|
@ -176,8 +196,7 @@ public class TestKeys {
|
|||
* @return Returns the name of the new key that was created.
|
||||
* @throws OzoneException
|
||||
*/
|
||||
private String putKey() throws
|
||||
OzoneException {
|
||||
private KsmKeyArgs putKey() throws Exception {
|
||||
String volumeName = OzoneUtils.getRequestID().toLowerCase();
|
||||
client.setUserAuth("hdfs");
|
||||
|
||||
|
@ -188,16 +207,21 @@ public class TestKeys {
|
|||
bucket = vol.createBucket(bucketName, acls, StorageType.DEFAULT);
|
||||
|
||||
String fileName = OzoneUtils.getRequestID().toLowerCase();
|
||||
|
||||
file = createRandomDataFile(dir, fileName, 1024);
|
||||
|
||||
bucket.putKey(keyName, file);
|
||||
return keyName;
|
||||
return new KsmKeyArgs.Builder()
|
||||
.setKeyName(keyName)
|
||||
.setVolumeName(volumeName)
|
||||
.setBucketName(bucketName)
|
||||
.setDataSize(1024)
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutKey() throws OzoneException {
|
||||
public void testPutKey() throws Exception {
|
||||
// Test non-delimited keys
|
||||
runTestPutKey(new PutHelper(ozoneRestClient, path));
|
||||
// Test key delimited by a random delimiter
|
||||
|
@ -206,7 +230,7 @@ public class TestKeys {
|
|||
getMultiPartKey(delimiter)));
|
||||
}
|
||||
|
||||
static void runTestPutKey(PutHelper helper) throws OzoneException {
|
||||
static void runTestPutKey(PutHelper helper) throws Exception {
|
||||
final OzoneRestClient client = helper.client;
|
||||
helper.putKey();
|
||||
assertNotNull(helper.getBucket());
|
||||
|
@ -254,8 +278,7 @@ public class TestKeys {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPutAndGetKeyWithDnRestart()
|
||||
throws OzoneException, IOException, URISyntaxException {
|
||||
public void testPutAndGetKeyWithDnRestart() throws Exception {
|
||||
runTestPutAndGetKeyWithDnRestart(
|
||||
new PutHelper(ozoneRestClient, path), ozoneCluster);
|
||||
String delimiter = RandomStringUtils.randomAscii(1);
|
||||
|
@ -265,9 +288,8 @@ public class TestKeys {
|
|||
}
|
||||
|
||||
static void runTestPutAndGetKeyWithDnRestart(
|
||||
PutHelper helper, MiniOzoneCluster cluster)
|
||||
throws OzoneException, IOException, URISyntaxException {
|
||||
String keyName = helper.putKey();
|
||||
PutHelper helper, MiniOzoneCluster cluster) throws Exception {
|
||||
String keyName = helper.putKey().getKeyName();
|
||||
assertNotNull(helper.getBucket());
|
||||
assertNotNull(helper.getFile());
|
||||
|
||||
|
@ -281,37 +303,35 @@ public class TestKeys {
|
|||
|
||||
helper.getBucket().getKey(keyName, newPath);
|
||||
|
||||
FileInputStream original = new FileInputStream(helper.getFile());
|
||||
FileInputStream downloaded = new FileInputStream(newPath.toFile());
|
||||
|
||||
|
||||
String originalHash = DigestUtils.sha256Hex(original);
|
||||
String downloadedHash = DigestUtils.sha256Hex(downloaded);
|
||||
|
||||
assertEquals(
|
||||
"Sha256 does not match between original file and downloaded file.",
|
||||
originalHash, downloadedHash);
|
||||
try (
|
||||
FileInputStream original = new FileInputStream(helper.getFile());
|
||||
FileInputStream downloaded = new FileInputStream(newPath.toFile())) {
|
||||
String originalHash = DigestUtils.sha256Hex(original);
|
||||
String downloadedHash = DigestUtils.sha256Hex(downloaded);
|
||||
assertEquals(
|
||||
"Sha256 does not match between original file and downloaded file.",
|
||||
originalHash, downloadedHash);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutAndGetKey() throws OzoneException, IOException {
|
||||
public void testPutAndGetKey() throws Exception {
|
||||
runTestPutAndGetKey(new PutHelper(ozoneRestClient, path));
|
||||
String delimiter = RandomStringUtils.randomAscii(1);
|
||||
runTestPutAndGetKey(new PutHelper(ozoneRestClient, path,
|
||||
getMultiPartKey(delimiter)));
|
||||
}
|
||||
|
||||
static void runTestPutAndGetKey(PutHelper helper)
|
||||
throws OzoneException, IOException {
|
||||
static void runTestPutAndGetKey(PutHelper helper) throws Exception {
|
||||
final OzoneRestClient client = helper.client;
|
||||
|
||||
String keyName = helper.putKey();
|
||||
String keyName = helper.putKey().getKeyName();
|
||||
assertNotNull(helper.getBucket());
|
||||
assertNotNull(helper.getFile());
|
||||
|
||||
final String newFileName1 = helper.dir + "/"
|
||||
final String newFileName1 = helper.dir + "/"
|
||||
+ OzoneUtils.getRequestID().toLowerCase();
|
||||
final String newFileName2 = helper.dir + "/"
|
||||
final String newFileName2 = helper.dir + "/"
|
||||
+ OzoneUtils.getRequestID().toLowerCase();
|
||||
|
||||
Path newPath1 = Paths.get(newFileName1);
|
||||
|
@ -322,54 +342,51 @@ public class TestKeys {
|
|||
client.getKey(helper.getVol().getVolumeName(),
|
||||
helper.getBucket().getBucketName(), keyName, newPath2);
|
||||
|
||||
FileInputStream original = new FileInputStream(helper.getFile());
|
||||
FileInputStream downloaded1 = new FileInputStream(newPath1.toFile());
|
||||
FileInputStream downloaded2 = new FileInputStream(newPath1.toFile());
|
||||
try (FileInputStream original = new FileInputStream(helper.getFile());
|
||||
FileInputStream downloaded1 = new FileInputStream(newPath1.toFile());
|
||||
FileInputStream downloaded2 = new FileInputStream(newPath1.toFile())) {
|
||||
String originalHash = DigestUtils.sha256Hex(original);
|
||||
String downloadedHash1 = DigestUtils.sha256Hex(downloaded1);
|
||||
String downloadedHash2 = DigestUtils.sha256Hex(downloaded2);
|
||||
|
||||
String originalHash = DigestUtils.sha256Hex(original);
|
||||
String downloadedHash1 = DigestUtils.sha256Hex(downloaded1);
|
||||
String downloadedHash2 = DigestUtils.sha256Hex(downloaded2);
|
||||
assertEquals(
|
||||
"Sha256 does not match between original file and downloaded file.",
|
||||
originalHash, downloadedHash1);
|
||||
assertEquals(
|
||||
"Sha256 does not match between original file and downloaded file.",
|
||||
originalHash, downloadedHash2);
|
||||
|
||||
assertEquals(
|
||||
"Sha256 does not match between original file and downloaded file.",
|
||||
originalHash, downloadedHash1);
|
||||
assertEquals(
|
||||
"Sha256 does not match between original file and downloaded file.",
|
||||
originalHash, downloadedHash2);
|
||||
// test new get key with invalid volume/bucket name
|
||||
try {
|
||||
client.getKey("invalid-volume", helper.getBucket().getBucketName(),
|
||||
keyName, newPath1);
|
||||
fail("Get key should have thrown " + "when using invalid volume name.");
|
||||
} catch (OzoneException e) {
|
||||
GenericTestUtils
|
||||
.assertExceptionContains(Status.KEY_NOT_FOUND.toString(), e);
|
||||
}
|
||||
|
||||
// test new get key with invalid volume/bucket name
|
||||
try {
|
||||
client.getKey("invalid-volume",
|
||||
helper.getBucket().getBucketName(), keyName, newPath1);
|
||||
fail("Get key should have thrown "
|
||||
+ "when using invalid volume name.");
|
||||
} catch (OzoneException e) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
Status.KEY_NOT_FOUND.toString(), e);
|
||||
}
|
||||
|
||||
try {
|
||||
client.getKey(helper.getVol().getVolumeName(),
|
||||
"invalid-bucket", keyName, newPath1);
|
||||
fail("Get key should have thrown "
|
||||
+ "when using invalid bucket name.");
|
||||
} catch (OzoneException e) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
Status.KEY_NOT_FOUND.toString(), e);
|
||||
try {
|
||||
client.getKey(helper.getVol().getVolumeName(), "invalid-bucket",
|
||||
keyName, newPath1);
|
||||
fail("Get key should have thrown " + "when using invalid bucket name.");
|
||||
} catch (OzoneException e) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
Status.KEY_NOT_FOUND.toString(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutAndDeleteKey() throws OzoneException, IOException {
|
||||
public void testPutAndDeleteKey() throws Exception {
|
||||
runTestPutAndDeleteKey(new PutHelper(ozoneRestClient, path));
|
||||
String delimiter = RandomStringUtils.randomAscii(1);
|
||||
runTestPutAndDeleteKey(new PutHelper(ozoneRestClient, path,
|
||||
getMultiPartKey(delimiter)));
|
||||
}
|
||||
|
||||
static void runTestPutAndDeleteKey(PutHelper helper)
|
||||
throws OzoneException, IOException {
|
||||
String keyName = helper.putKey();
|
||||
static void runTestPutAndDeleteKey(PutHelper helper) throws Exception {
|
||||
String keyName = helper.putKey().getKeyName();
|
||||
assertNotNull(helper.getBucket());
|
||||
assertNotNull(helper.getFile());
|
||||
helper.getBucket().deleteKey(keyName);
|
||||
|
@ -384,16 +401,14 @@ public class TestKeys {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPutAndListKey()
|
||||
throws OzoneException, IOException, ParseException {
|
||||
public void testPutAndListKey() throws Exception {
|
||||
runTestPutAndListKey(new PutHelper(ozoneRestClient, path));
|
||||
String delimiter = RandomStringUtils.randomAscii(1);
|
||||
runTestPutAndListKey(new PutHelper(ozoneRestClient, path,
|
||||
getMultiPartKey(delimiter)));
|
||||
}
|
||||
|
||||
static void runTestPutAndListKey(PutHelper helper)
|
||||
throws OzoneException, IOException, ParseException {
|
||||
static void runTestPutAndListKey(PutHelper helper) throws Exception {
|
||||
final OzoneRestClient client = helper.client;
|
||||
helper.putKey();
|
||||
assertNotNull(helper.getBucket());
|
||||
|
@ -473,17 +488,15 @@ public class TestKeys {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testGetKeyInfo()
|
||||
throws OzoneException, IOException, ParseException {
|
||||
public void testGetKeyInfo() throws Exception {
|
||||
runTestGetKeyInfo(new PutHelper(ozoneRestClient, path));
|
||||
String delimiter = RandomStringUtils.randomAscii(1);
|
||||
runTestGetKeyInfo(new PutHelper(ozoneRestClient, path,
|
||||
getMultiPartKey(delimiter)));
|
||||
}
|
||||
|
||||
static void runTestGetKeyInfo(PutHelper helper)
|
||||
throws OzoneException, ParseException {
|
||||
String keyName = helper.putKey();
|
||||
static void runTestGetKeyInfo(PutHelper helper) throws Exception {
|
||||
String keyName = helper.putKey().getKeyName();
|
||||
assertNotNull(helper.getBucket());
|
||||
assertNotNull(helper.getFile());
|
||||
|
||||
|
@ -500,4 +513,170 @@ public class TestKeys {
|
|||
(OzoneUtils.formatDate(keyInfo.getObjectInfo().getModifiedOn())
|
||||
/ 1000) >= (currentTime / 1000));
|
||||
}
|
||||
|
||||
// Volume, bucket, keys info that helps for test create/delete keys.
|
||||
private static class BucketKeys {
|
||||
|
||||
private Map<Pair<String, String>, List<String>> buckets;
|
||||
|
||||
BucketKeys() {
|
||||
buckets = Maps.newHashMap();
|
||||
}
|
||||
|
||||
void addKey(String volume, String bucket, String key) {
|
||||
// check if this bucket exists
|
||||
for (Map.Entry<Pair<String, String>, List<String>> entry :
|
||||
buckets.entrySet()) {
|
||||
if (entry.getKey().getValue().equals(bucket)) {
|
||||
entry.getValue().add(key);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// bucket not exist
|
||||
Pair<String, String> newBucket = new ImmutablePair(volume, bucket);
|
||||
List<String> keyList = Lists.newArrayList();
|
||||
keyList.add(key);
|
||||
buckets.put(newBucket, keyList);
|
||||
}
|
||||
|
||||
Set<Pair<String, String>> getAllBuckets() {
|
||||
return buckets.keySet();
|
||||
}
|
||||
|
||||
List<String> getBucketKeys(String bucketName) {
|
||||
for (Map.Entry<Pair<String, String>, List<String>> entry : buckets
|
||||
.entrySet()) {
|
||||
if (entry.getKey().getValue().equals(bucketName)) {
|
||||
return entry.getValue();
|
||||
}
|
||||
}
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
int totalNumOfKeys() {
|
||||
int count = 0;
|
||||
for (Map.Entry<Pair<String, String>, List<String>> entry : buckets
|
||||
.entrySet()) {
|
||||
count += entry.getValue().size();
|
||||
}
|
||||
return count;
|
||||
}
|
||||
}
|
||||
|
||||
private int countKsmKeys(KeySpaceManager ksm) throws IOException {
|
||||
int totalCount = 0;
|
||||
List<KsmVolumeArgs> volumes =
|
||||
ksm.listAllVolumes(null, null, Integer.MAX_VALUE);
|
||||
for (KsmVolumeArgs volume : volumes) {
|
||||
List<KsmBucketInfo> buckets =
|
||||
ksm.listBuckets(volume.getVolume(), null, null, Integer.MAX_VALUE);
|
||||
for (KsmBucketInfo bucket : buckets) {
|
||||
List<KsmKeyInfo> keys = ksm.listKeys(bucket.getVolumeName(),
|
||||
bucket.getBucketName(), null, null, Integer.MAX_VALUE);
|
||||
totalCount += keys.size();
|
||||
}
|
||||
}
|
||||
return totalCount;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteKey() throws Exception {
|
||||
KeySpaceManager ksm = ozoneCluster.getKeySpaceManager();
|
||||
// To avoid interference from other test cases,
|
||||
// we collect number of existing keys at the beginning
|
||||
int numOfExistedKeys = countKsmKeys(ksm);
|
||||
|
||||
// Keep tracking bucket keys info while creating them
|
||||
PutHelper helper = new PutHelper(ozoneRestClient, path);
|
||||
BucketKeys bucketKeys = new BucketKeys();
|
||||
for (int i = 0; i < 20; i++) {
|
||||
KsmKeyArgs keyArgs = helper.putKey();
|
||||
bucketKeys.addKey(keyArgs.getVolumeName(), keyArgs.getBucketName(),
|
||||
keyArgs.getKeyName());
|
||||
}
|
||||
|
||||
// There should be 20 keys in the buckets we just created.
|
||||
Assert.assertEquals(20, bucketKeys.totalNumOfKeys());
|
||||
|
||||
int numOfCreatedKeys = 0;
|
||||
OzoneContainer cm = ozoneCluster.getDataNodes().get(0)
|
||||
.getOzoneContainerManager();
|
||||
|
||||
// Expected to delete chunk file list.
|
||||
List<File> expectedChunkFiles = Lists.newArrayList();
|
||||
// Iterate over all buckets, and list all keys in each bucket,
|
||||
// count the total number of created keys.
|
||||
Set<Pair<String, String>> buckets = bucketKeys.getAllBuckets();
|
||||
for (Pair<String, String> buk : buckets) {
|
||||
List<KsmKeyInfo> createdKeys =
|
||||
ksm.listKeys(buk.getKey(), buk.getValue(), null, null, 20);
|
||||
|
||||
// Memorize chunks that has been created,
|
||||
// so we can verify actual deletions at DN side later.
|
||||
for (KsmKeyInfo keyInfo : createdKeys) {
|
||||
List<KsmKeyLocationInfo> locations = keyInfo.getKeyLocationList();
|
||||
for (KsmKeyLocationInfo location : locations) {
|
||||
String containerName = location.getContainerName();
|
||||
KeyData keyData = new KeyData(containerName, location.getBlockID());
|
||||
KeyData blockInfo = cm.getContainerManager()
|
||||
.getKeyManager().getKey(keyData);
|
||||
ContainerData containerData = cm.getContainerManager()
|
||||
.readContainer(containerName);
|
||||
File dataDir = ContainerUtils
|
||||
.getDataDirectory(containerData).toFile();
|
||||
for (ContainerProtos.ChunkInfo chunkInfo : blockInfo.getChunks()) {
|
||||
File chunkFile = dataDir.toPath()
|
||||
.resolve(chunkInfo.getChunkName()).toFile();
|
||||
System.out.println("Chunk File created: "
|
||||
+ chunkFile.getAbsolutePath());
|
||||
Assert.assertTrue(chunkFile.exists());
|
||||
expectedChunkFiles.add(chunkFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
numOfCreatedKeys += createdKeys.size();
|
||||
}
|
||||
|
||||
// Ensure all keys are created.
|
||||
Assert.assertEquals(20, numOfCreatedKeys);
|
||||
|
||||
// Ensure all keys are visible from KSM.
|
||||
// Total number should be numOfCreated + numOfExisted
|
||||
Assert.assertEquals(20 + numOfExistedKeys, countKsmKeys(ksm));
|
||||
|
||||
// Delete 10 keys
|
||||
int delCount = 20;
|
||||
Set<Pair<String, String>> allBuckets = bucketKeys.getAllBuckets();
|
||||
for (Pair<String, String> bucketInfo : allBuckets) {
|
||||
List<String> bks = bucketKeys.getBucketKeys(bucketInfo.getValue());
|
||||
for (String keyName : bks) {
|
||||
if (delCount > 0) {
|
||||
KsmKeyArgs arg =
|
||||
new KsmKeyArgs.Builder().setVolumeName(bucketInfo.getKey())
|
||||
.setBucketName(bucketInfo.getValue()).setKeyName(keyName)
|
||||
.build();
|
||||
ksm.deleteKey(arg);
|
||||
delCount--;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// It should be pretty quick that keys are removed from KSM namespace,
|
||||
// because actual deletion happens in async mode.
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
try {
|
||||
int num = countKsmKeys(ksm);
|
||||
return num == (numOfExistedKeys);
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}, 1000, 10000);
|
||||
|
||||
// It might take a while until all blocks are actually deleted,
|
||||
// verify all chunk files created earlier are removed from disk.
|
||||
GenericTestUtils.waitFor(
|
||||
() -> expectedChunkFiles.stream().allMatch(file -> !file.exists()),
|
||||
1000, 60000);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.web.client;
|
|||
import org.apache.commons.lang.RandomStringUtils;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.RatisTestHelper;
|
||||
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
|
@ -28,10 +27,6 @@ import org.junit.Rule;
|
|||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.text.ParseException;
|
||||
|
||||
import static org.apache.hadoop.ozone.web.client.TestKeys.*;
|
||||
|
||||
/** The same as {@link TestKeys} except that this test is Ratis enabled. */
|
||||
|
@ -59,7 +54,7 @@ public class TestKeysRatis {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPutKey() throws OzoneException {
|
||||
public void testPutKey() throws Exception {
|
||||
runTestPutKey(new PutHelper(ozoneRestClient, path));
|
||||
String delimiter = RandomStringUtils.randomAlphanumeric(1);
|
||||
runTestPutKey(new PutHelper(ozoneRestClient, path,
|
||||
|
@ -67,8 +62,7 @@ public class TestKeysRatis {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPutAndGetKeyWithDnRestart()
|
||||
throws OzoneException, IOException, URISyntaxException {
|
||||
public void testPutAndGetKeyWithDnRestart() throws Exception {
|
||||
runTestPutAndGetKeyWithDnRestart(
|
||||
new PutHelper(ozoneRestClient, path), suite.getCluster());
|
||||
String delimiter = RandomStringUtils.randomAlphanumeric(1);
|
||||
|
@ -78,7 +72,7 @@ public class TestKeysRatis {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPutAndGetKey() throws OzoneException, IOException {
|
||||
public void testPutAndGetKey() throws Exception {
|
||||
runTestPutAndGetKey(new PutHelper(ozoneRestClient, path));
|
||||
String delimiter = RandomStringUtils.randomAlphanumeric(1);
|
||||
runTestPutAndGetKey(new PutHelper(ozoneRestClient, path,
|
||||
|
@ -86,7 +80,7 @@ public class TestKeysRatis {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPutAndDeleteKey() throws OzoneException, IOException {
|
||||
public void testPutAndDeleteKey() throws Exception {
|
||||
runTestPutAndDeleteKey(new PutHelper(ozoneRestClient, path));
|
||||
String delimiter = RandomStringUtils.randomAlphanumeric(1);
|
||||
runTestPutAndDeleteKey(new PutHelper(ozoneRestClient, path,
|
||||
|
@ -94,8 +88,7 @@ public class TestKeysRatis {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPutAndListKey()
|
||||
throws OzoneException, IOException, ParseException {
|
||||
public void testPutAndListKey() throws Exception {
|
||||
runTestPutAndListKey(new PutHelper(ozoneRestClient, path));
|
||||
String delimiter = RandomStringUtils.randomAlphanumeric(1);
|
||||
runTestPutAndListKey(new PutHelper(ozoneRestClient, path,
|
||||
|
@ -103,8 +96,7 @@ public class TestKeysRatis {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testGetKeyInfo()
|
||||
throws OzoneException, IOException, ParseException {
|
||||
public void testGetKeyInfo() throws Exception {
|
||||
runTestGetKeyInfo(new PutHelper(ozoneRestClient, path));
|
||||
String delimiter = RandomStringUtils.randomAlphanumeric(1);
|
||||
runTestGetKeyInfo(new PutHelper(ozoneRestClient, path,
|
||||
|
|
Loading…
Reference in New Issue