HDFS-12235. Ozone: DeleteKey-3: KSM SCM block deletion message and ACK interactions. Contributed by Weiwei Yang and Yuanbo Liu.

This commit is contained in:
Weiwei Yang 2017-09-11 14:28:20 +08:00
parent a73531be1e
commit 81f71b4795
22 changed files with 977 additions and 187 deletions

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.common;
import org.apache.hadoop.ozone.protocol.proto
.ScmBlockLocationProtocolProtos.KeyBlocks;
import java.util.List;
/**
* A group of blocks relations relevant, e.g belong to a certain object key.
*/
public final class BlockGroup {
private String groupID;
private List<String> blockIDs;
private BlockGroup(String groupID, List<String> blockIDs) {
this.groupID = groupID;
this.blockIDs = blockIDs;
}
public List<String> getBlockIDList() {
return blockIDs;
}
public String getGroupID() {
return groupID;
}
public KeyBlocks getProto() {
return KeyBlocks.newBuilder().setKey(groupID)
.addAllBlocks(blockIDs).build();
}
/**
* Parses a KeyBlocks proto to a group of blocks.
* @param proto KeyBlocks proto.
* @return a group of blocks.
*/
public static BlockGroup getFromProto(KeyBlocks proto) {
return BlockGroup.newBuilder().setKeyName(proto.getKey())
.addAllBlockIDs(proto.getBlocksList()).build();
}
public static Builder newBuilder() {
return new Builder();
}
/**
* BlockGroup instance builder.
*/
public static class Builder {
private String groupID;
private List<String> blockIDs;
public Builder setKeyName(String blockGroupID) {
this.groupID = blockGroupID;
return this;
}
public Builder addAllBlockIDs(List<String> keyBlocks) {
this.blockIDs = keyBlocks;
return this;
}
public BlockGroup build() {
return new BlockGroup(groupID, blockIDs);
}
}
}

View File

@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.common;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmBlockResult;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* Result to delete a group of blocks.
*/
public class DeleteBlockGroupResult {
private String objectKey;
private List<DeleteBlockResult> blockResultList;
public DeleteBlockGroupResult(String objectKey,
List<DeleteBlockResult> blockResultList) {
this.objectKey = objectKey;
this.blockResultList = blockResultList;
}
public String getObjectKey() {
return objectKey;
}
public List<DeleteBlockResult> getBlockResultList() {
return blockResultList;
}
public List<DeleteScmBlockResult> getBlockResultProtoList() {
List<DeleteScmBlockResult> resultProtoList =
new ArrayList<>(blockResultList.size());
for (DeleteBlockResult result : blockResultList) {
DeleteScmBlockResult proto = DeleteScmBlockResult.newBuilder()
.setKey(result.getKey())
.setResult(result.getResult()).build();
resultProtoList.add(proto);
}
return resultProtoList;
}
public static List<DeleteBlockResult> convertBlockResultProto(
List<DeleteScmBlockResult> results) {
List<DeleteBlockResult> protoResults = new ArrayList<>(results.size());
for (DeleteScmBlockResult result : results) {
protoResults.add(new DeleteBlockResult(result.getKey(),
result.getResult()));
}
return protoResults;
}
/**
* Only if all blocks are successfully deleted, this group is considered
* to be successfully executed.
*
* @return true if all blocks are successfully deleted, false otherwise.
*/
public boolean isSuccess() {
for (DeleteBlockResult result : blockResultList) {
if (result.getResult() != Result.success) {
return false;
}
}
return true;
}
/**
* @return A list of deletion failed block IDs.
*/
public List<String> getFailedBlocks() {
List<String> failedBlocks = blockResultList.stream()
.filter(result -> result.getResult() != Result.success)
.map(DeleteBlockResult::getKey).collect(Collectors.toList());
return failedBlocks;
}
}

View File

@ -74,4 +74,8 @@ public final class KSMConfigKeys {
"ozone.ksm.group.rights";
public static final OzoneAcl.OzoneACLRights OZONE_KSM_GROUP_RIGHTS_DEFAULT =
OzoneAcl.OzoneACLRights.READ_WRITE;
public static final String OZONE_KEY_DELETING_LIMIT_PER_TASK =
"ozone.key.deleting.limit.per.task";
public static final int OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT = 1000;
}

View File

@ -22,8 +22,9 @@ import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
/**
* ScmBlockLocationProtocol is used by an HDFS node to find the set of nodes
@ -53,13 +54,14 @@ public interface ScmBlockLocationProtocol {
AllocatedBlock allocateBlock(long size) throws IOException;
/**
* Delete the set of keys specified.
* Delete blocks for a set of object keys.
*
* @param keys batch of block keys to delete.
* @param keyBlocksInfoList Map of object key and its blocks.
* @return list of block deletion results.
* @throws IOException if there is any failure.
*
*/
List<DeleteBlockResult> deleteBlocks(Set<String> keys) throws IOException;
List<DeleteBlockGroupResult> deleteKeyBlocks(
List<BlockGroup> keyBlocksInfoList) throws IOException;
}

View File

@ -24,24 +24,24 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
.AllocateScmBlockRequestProto;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
.AllocateScmBlockResponseProto;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
.DeleteScmBlocksRequestProto;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
.DeleteScmBlocksResponseProto;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
.DeleteScmBlockResult;
.DeleteScmKeyBlocksResponseProto;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
.GetScmBlockLocationsRequestProto;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
.GetScmBlockLocationsResponseProto;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
.ScmLocatedBlockProto;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
.KeyBlocks;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
@ -50,6 +50,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* This class is the client-side translator to translate the requests made on
@ -147,30 +148,32 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
/**
* Delete the set of keys specified.
*
* @param keys batch of block keys to delete.
* @param keyBlocksInfoList batch of block keys to delete.
* @return list of block deletion results.
* @throws IOException if there is any failure.
*
*/
@Override
public List<DeleteBlockResult> deleteBlocks(Set<String> keys)
throws IOException {
Preconditions.checkArgument(keys != null && !keys.isEmpty(),
"keys to be deleted cannot be null or empty");
DeleteScmBlocksRequestProto request = DeleteScmBlocksRequestProto
.newBuilder()
.addAllKeys(keys)
.build();
final DeleteScmBlocksResponseProto resp;
public List<DeleteBlockGroupResult> deleteKeyBlocks(
List<BlockGroup> keyBlocksInfoList) throws IOException {
List<KeyBlocks> keyBlocksProto = keyBlocksInfoList.stream()
.map(BlockGroup::getProto).collect(Collectors.toList());
DeleteScmKeyBlocksRequestProto request = DeleteScmKeyBlocksRequestProto
.newBuilder().addAllKeyBlocks(keyBlocksProto).build();
final DeleteScmKeyBlocksResponseProto resp;
try {
resp = rpcProxy.deleteScmBlocks(NULL_RPC_CONTROLLER, request);
resp = rpcProxy.deleteScmKeyBlocks(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
List<DeleteBlockResult> results = new ArrayList(resp.getResultsCount());
for (DeleteScmBlockResult result : resp.getResultsList()) {
results.add(new DeleteBlockResult(result.getKey(), result.getResult()));
}
List<DeleteBlockGroupResult> results =
new ArrayList<>(resp.getResultsCount());
results.addAll(resp.getResultsList().stream().map(
result -> new DeleteBlockGroupResult(result.getObjectKey(),
DeleteBlockGroupResult
.convertBlockResultProto(result.getBlockResultsList())))
.collect(Collectors.toList()));
return results;
}

View File

@ -64,17 +64,40 @@ message AllocateScmBlockRequestProto {
}
/**
* keys - batch of block keys to deleted
* A delete key request sent by KSM to SCM, it contains
* multiple number of keys (and their blocks).
*/
message DeleteScmBlocksRequestProto {
repeated string keys = 1;
message DeleteScmKeyBlocksRequestProto {
repeated KeyBlocks keyBlocks = 1;
}
/**
* deletedKeys - keys that are deleted successfully
* A object key and all its associated blocks.
* We need to encapsulate object key name plus the blocks in this potocol
* because SCM needs to response KSM with the keys it has deleted.
* If the response only contains blocks, it will be very expensive for
* KSM to figure out what keys have been deleted.
*/
message DeleteScmBlocksResponseProto {
repeated DeleteScmBlockResult results = 1;
message KeyBlocks {
required string key = 1;
repeated string blocks = 2;
}
/**
* A delete key response from SCM to KSM, it contains multiple child-results.
* Each child-result represents a key deletion result, only if all blocks of
* a key are successfully deleted, this key result is considered as succeed.
*/
message DeleteScmKeyBlocksResponseProto {
repeated DeleteKeyBlocksResultProto results = 1;
}
/**
* A key deletion result. It contains all the block deletion results.
*/
message DeleteKeyBlocksResultProto {
required string objectKey = 1;
repeated DeleteScmBlockResult blockResults = 2;
}
message DeleteScmBlockResult {
@ -126,8 +149,8 @@ service ScmBlockLocationProtocolService {
returns (AllocateScmBlockResponseProto);
/**
* Deletes one or multiple block keys from SCM.
* Deletes blocks for a set of object keys from SCM.
*/
rpc deleteScmBlocks(DeleteScmBlocksRequestProto)
returns (DeleteScmBlocksResponseProto);
rpc deleteScmKeyBlocks(DeleteScmKeyBlocksRequestProto)
returns (DeleteScmKeyBlocksResponseProto);
}

View File

@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.ksm;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.utils.BatchOperation;
@ -206,4 +207,16 @@ public interface KSMMetadataManager {
*/
List<KsmVolumeArgs> listVolumes(String userName, String prefix,
String startKey, int maxKeys) throws IOException;
/**
* Returns a list of pending deletion key info that ups to the given count.
* Each entry is a {@link BlockGroup}, which contains the info about the
* key name and all its associated block IDs. A pending deletion key is
* stored with #deleting# prefix in KSM DB.
*
* @param count max number of keys to return.
* @return a list of {@link BlockGroup} represent keys and blocks.
* @throws IOException
*/
List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
}

View File

@ -21,9 +21,8 @@ import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ozone.ksm.helpers.*;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
@ -47,6 +46,7 @@ import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
@ -440,4 +440,28 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager {
return builder.build();
}
@Override
public List<BlockGroup> getPendingDeletionKeys(final int count)
throws IOException {
List<BlockGroup> keyBlocksList = Lists.newArrayList();
final MetadataKeyFilter deletingKeyFilter =
new KeyPrefixFilter(DELETING_KEY_PREFIX);
List<Map.Entry<byte[], byte[]>> rangeResult =
store.getRangeKVs(null, count, deletingKeyFilter);
for (Map.Entry<byte[], byte[]> entry : rangeResult) {
KsmKeyInfo info =
KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
// Get block keys as a list.
List<String> item = info.getKeyLocationList().stream()
.map(KsmKeyLocationInfo::getBlockID)
.collect(Collectors.toList());
BlockGroup keyBlocks = BlockGroup.newBuilder()
.setKeyName(DFSUtil.bytes2String(entry.getKey()))
.addAllBlockIDs(item)
.build();
keyBlocksList.add(keyBlocks);
}
return keyBlocksList;
}
}

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.ksm;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.utils.BackgroundService;
import org.apache.hadoop.utils.BackgroundTask;
import org.apache.hadoop.utils.BackgroundTaskQueue;
import org.apache.hadoop.utils.BackgroundTaskResult;
import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
/**
* This is the background service to delete keys.
* Scan the metadata of ksm periodically to get
* the keys with prefix "#deleting" and ask scm to
* delete metadata accordingly, if scm returns
* success for keys, then clean up those keys.
*/
public class KeyDeletingService extends BackgroundService {
private static final Logger LOG =
LoggerFactory.getLogger(KeyDeletingService.class);
// The thread pool size for key deleting service.
private final static int KEY_DELETING_CORE_POOL_SIZE = 2;
private final ScmBlockLocationProtocol scmClient;
private final KeyManager manager;
private final int keyLimitPerTask;
public KeyDeletingService(ScmBlockLocationProtocol scmClient,
KeyManager manager, int serviceInterval,
long serviceTimeout, Configuration conf) {
super("KeyDeletingService", serviceInterval, TimeUnit.MILLISECONDS,
KEY_DELETING_CORE_POOL_SIZE, serviceTimeout);
this.scmClient = scmClient;
this.manager = manager;
this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK,
OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
}
@Override
public BackgroundTaskQueue getTasks() {
BackgroundTaskQueue queue = new BackgroundTaskQueue();
queue.add(new KeyDeletingTask());
return queue;
}
/**
* A key deleting task scans KSM DB and looking for a certain number
* of pending-deletion keys, sends these keys along with their associated
* blocks to SCM for deletion. Once SCM confirms keys are deleted (once
* SCM persisted the blocks info in its deletedBlockLog), it removes
* these keys from the DB.
*/
private class KeyDeletingTask implements
BackgroundTask<BackgroundTaskResult> {
@Override
public int getPriority() {
return 0;
}
@Override
public BackgroundTaskResult call() throws Exception {
try {
List<BlockGroup> keyBlocksList = manager
.getPendingDeletionKeys(keyLimitPerTask);
if (keyBlocksList.size() > 0) {
LOG.info("Found {} to-delete keys in KSM", keyBlocksList.size());
List<DeleteBlockGroupResult> results =
scmClient.deleteKeyBlocks(keyBlocksList);
for (DeleteBlockGroupResult result : results) {
if (result.isSuccess()) {
try {
// Purge key from KSM DB.
manager.deletePendingDeletionKey(result.getObjectKey());
LOG.info("Key {} deleted from KSM DB", result.getObjectKey());
} catch (IOException e) {
// if a pending deletion key is failed to delete,
// print a warning here and retain it in this state,
// so that it can be attempt to delete next time.
LOG.warn("Failed to delete pending-deletion key {}",
result.getObjectKey(), e);
}
} else {
// Key deletion failed, retry in next interval.
LOG.warn("Key {} deletion failed because some of the blocks"
+ " were failed to delete, failed blocks: {}",
result.getObjectKey(),
String.join(",", result.getFailedBlocks()));
}
}
return results::size;
} else {
LOG.info("No pending deletion key found in KSM");
}
} catch (IOException e) {
LOG.error("Unable to get pending deletion keys, retry in"
+ " next interval", e);
}
return EmptyTaskResult.newResult();
}
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.hadoop.ozone.ksm;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
@ -26,6 +27,17 @@ import java.util.List;
* Handles key level commands.
*/
public interface KeyManager {
/**
* Start key manager.
*/
void start();
/**
* Stop key manager.
*/
void stop() throws IOException;
/**
* Given the args of a key to put, return a pipeline for the key. Writes
* the key to pipeline mapping to meta data.
@ -89,4 +101,26 @@ public interface KeyManager {
List<KsmKeyInfo> listKeys(String volumeName,
String bucketName, String startKey, String keyPrefix, int maxKeys)
throws IOException;
/**
* Returns a list of pending deletion key info that ups to the given count.
* Each entry is a {@link BlockGroup}, which contains the info about the
* key name and all its associated block IDs. A pending deletion key is
* stored with #deleting# prefix in KSM DB.
*
* @param count max number of keys to return.
* @return a list of {@link BlockGroup} representing keys and blocks.
* @throws IOException
*/
List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
/**
* Deletes a pending deletion key by its name. This is often called when
* key can be safely deleted from this layer. Once called, all footprints
* of the key will be purged from KSM DB.
*
* @param objectKeyName object key name with #deleting# prefix.
* @throws IOException if specified key doesn't exist or other I/O errors.
*/
void deletePendingDeletionKey(String objectKeyName) throws IOException;
}

View File

@ -17,7 +17,10 @@
package org.apache.hadoop.ozone.ksm;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.ozone.OzoneConfiguration;
@ -27,6 +30,7 @@ import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyI
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.utils.BackgroundService;
import org.apache.hadoop.utils.BatchOperation;
import org.iq80.leveldb.DBException;
import org.slf4j.Logger;
@ -35,7 +39,12 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_KEY;
@ -52,6 +61,7 @@ public class KeyManagerImpl implements KeyManager {
private final ScmBlockLocationProtocol scmBlockClient;
private final KSMMetadataManager metadataManager;
private final long scmBlockSize;
private final BackgroundService keyDeletingService;
public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
KSMMetadataManager metadataManager, OzoneConfiguration conf) {
@ -59,6 +69,24 @@ public class KeyManagerImpl implements KeyManager {
this.metadataManager = metadataManager;
this.scmBlockSize = conf.getLong(OZONE_SCM_BLOCK_SIZE_KEY,
OZONE_SCM_BLOCK_SIZE_DEFAULT);
int svcInterval = conf.getInt(
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS,
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT);
long serviceTimeout = conf.getTimeDuration(
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
keyDeletingService = new KeyDeletingService(
scmBlockClient, this, svcInterval, serviceTimeout, conf);
}
@Override
public void start() {
keyDeletingService.start();
}
@Override
public void stop() throws IOException {
keyDeletingService.shutdown();
}
@Override
@ -181,7 +209,6 @@ public class KeyManagerImpl implements KeyManager {
@Override
public void deleteKey(KsmKeyArgs args) throws IOException {
Preconditions.checkNotNull(args);
metadataManager.writeLock().lock();
String volumeName = args.getVolumeName();
String bucketName = args.getBucketName();
@ -223,4 +250,39 @@ public class KeyManagerImpl implements KeyManager {
metadataManager.readLock().unlock();
}
}
@Override
public List<BlockGroup> getPendingDeletionKeys(final int count)
throws IOException {
metadataManager.readLock().lock();
try {
return metadataManager.getPendingDeletionKeys(count);
} finally {
metadataManager.readLock().unlock();
}
}
@Override
public void deletePendingDeletionKey(String objectKeyName)
throws IOException{
Preconditions.checkNotNull(objectKeyName);
if (!objectKeyName.startsWith(OzoneConsts.DELETING_KEY_PREFIX)) {
throw new IllegalArgumentException("Invalid key name,"
+ " the name should be the key name with deleting prefix");
}
// Simply removes the entry from KSM DB.
metadataManager.writeLock().lock();
try {
byte[] pendingDelKey = DFSUtil.string2Bytes(objectKeyName);
byte[] delKeyValue = metadataManager.get(pendingDelKey);
if (delKeyValue == null) {
throw new IOException("Failed to delete key " + objectKeyName
+ " because it is not found in DB");
}
metadataManager.delete(pendingDelKey);
} finally {
metadataManager.writeLock().unlock();
}
}
}

View File

@ -217,6 +217,7 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
ksmRpcAddress));
DefaultMetricsSystem.initialize("KeySpaceManager");
metadataManager.start();
keyManager.start();
ksmRpcServer.start();
httpServer.start();
registerMXBean();
@ -228,8 +229,9 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
*/
public void stop() {
try {
ksmRpcServer.stop();
metadataManager.stop();
ksmRpcServer.stop();
keyManager.stop();
httpServer.stop();
metrics.unRegister();
unregisterMXBean();

View File

@ -21,8 +21,10 @@ import com.google.common.collect.Sets;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteKeyBlocksResultProto;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
@ -32,11 +34,9 @@ import org.apache.hadoop.ozone.protocol.proto
import org.apache.hadoop.ozone.protocol.proto
.ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.ScmBlockLocationProtocolProtos.DeleteScmBlocksRequestProto;
.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.ScmBlockLocationProtocolProtos.DeleteScmBlocksResponseProto;
import static org.apache.hadoop.ozone.protocol.proto
.ScmBlockLocationProtocolProtos.DeleteScmBlockResult;
.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.ScmBlockLocationProtocolProtos.GetScmBlockLocationsRequestProto;
import org.apache.hadoop.ozone.protocol.proto
@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.protocol.proto
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* This class is the server-side translator that forwards requests received on
@ -123,21 +124,22 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
}
@Override
public DeleteScmBlocksResponseProto deleteScmBlocks(
RpcController controller, DeleteScmBlocksRequestProto req)
public DeleteScmKeyBlocksResponseProto deleteScmKeyBlocks(
RpcController controller, DeleteScmKeyBlocksRequestProto req)
throws ServiceException {
Set<String> keys = Sets.newLinkedHashSetWithExpectedSize(
req.getKeysCount());
for (String key : req.getKeysList()) {
keys.add(key);
}
DeleteScmBlocksResponseProto.Builder resp =
DeleteScmBlocksResponseProto.newBuilder();
DeleteScmKeyBlocksResponseProto.Builder resp =
DeleteScmKeyBlocksResponseProto.newBuilder();
try {
final List<DeleteBlockResult> results = impl.deleteBlocks(keys);
for (DeleteBlockResult result: results) {
DeleteScmBlockResult.Builder deleteResult = DeleteScmBlockResult
.newBuilder().setKey(result.getKey()).setResult(result.getResult());
List<BlockGroup> infoList = req.getKeyBlocksList().stream()
.map(BlockGroup::getFromProto).collect(Collectors.toList());
final List<DeleteBlockGroupResult> results =
impl.deleteKeyBlocks(infoList);
for (DeleteBlockGroupResult result: results) {
DeleteKeyBlocksResultProto.Builder deleteResult =
DeleteKeyBlocksResultProto
.newBuilder()
.setObjectKey(result.getObjectKey())
.addAllBlockResults(result.getBlockResultProtoList());
resp.addResults(deleteResult.build());
}
} catch (IOException ex) {

View File

@ -33,6 +33,8 @@ import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
@ -85,6 +87,7 @@ import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
@ -838,19 +841,26 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
}
/**
* Delete blocks.
* @param keys batch of block keys to delete.
* Delete blocks for a set of object keys.
*
* @param keyBlocksInfoList list of block keys with object keys to delete.
* @return deletion results.
*/
public List<DeleteBlockResult> deleteBlocks(final Set<String> keys) {
List<DeleteBlockResult> results = new LinkedList<>();
for (String key: keys) {
public List<DeleteBlockGroupResult> deleteKeyBlocks(
List<BlockGroup> keyBlocksInfoList) throws IOException {
LOG.info("SCM is informed by KSM to delete {} blocks",
keyBlocksInfoList.size());
List<DeleteBlockGroupResult> results = new ArrayList<>();
for (BlockGroup keyBlocks : keyBlocksInfoList) {
Result resultCode;
try {
scmBlockManager.deleteBlock(key);
// We delete blocks in an atomic operation to prevent getting
// into state like only a partial of blocks are deleted,
// which will leave key in an inconsistent state.
scmBlockManager.deleteBlocks(keyBlocks.getBlockIDList());
resultCode = Result.success;
} catch (SCMException scmEx) {
LOG.warn("Fail to delete block: {}", key, scmEx);
LOG.warn("Fail to delete block: {}", keyBlocks.getGroupID(), scmEx);
switch (scmEx.getResult()) {
case CHILL_MODE_EXCEPTION:
resultCode = Result.chillMode;
@ -862,10 +872,16 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
resultCode = Result.unknownFailure;
}
} catch (IOException ex) {
LOG.warn("Fail to delete block: {}", key, ex);
LOG.warn("Fail to delete blocks for object key: {}",
keyBlocks.getGroupID(), ex);
resultCode = Result.unknownFailure;
}
results.add(new DeleteBlockResult(key, resultCode));
List<DeleteBlockResult> blockResultList = new ArrayList<>();
for (String blockKey : keyBlocks.getBlockIDList()) {
blockResultList.add(new DeleteBlockResult(blockKey, resultCode));
}
results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(),
blockResultList));
}
return results;
}

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
/**
*
@ -46,11 +47,16 @@ public interface BlockManager extends Closeable {
Pipeline getBlock(String key) throws IOException;
/**
* Given a key of the block, delete the block.
* @param key - key of the block.
* @throws IOException
* Deletes a list of blocks in an atomic operation. Internally, SCM
* writes these blocks into a {@link DeletedBlockLog} and deletes them
* from SCM DB. If this is successful, given blocks are entering pending
* deletion state and becomes invisible from SCM namespace.
*
* @param blockIDs block IDs. This is often the list of blocks of
* a particular object key.
* @throws IOException if exception happens, non of the blocks is deleted.
*/
void deleteBlock(String key) throws IOException;
void deleteBlocks(List<String> blockIDs) throws IOException;
/**
* @return the block deletion transaction log maintained by SCM.

View File

@ -45,6 +45,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@ -382,8 +383,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
}
// now we should have some candidates in ALLOCATE state
if (candidates.size() == 0) {
throw new SCMException("Fail to find any container to allocate block " +
"of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE);
throw new SCMException("Fail to find any container to allocate block "
+ "of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE);
}
}
@ -475,35 +476,84 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
}
/**
* Given a block key, delete a block.
* @param key - block key assigned by SCM.
* @throws IOException
* Deletes a list of blocks in an atomic operation. Internally, SCM
* writes these blocks into a {@link DeletedBlockLog} and deletes them
* from SCM DB. If this is successful, given blocks are entering pending
* deletion state and becomes invisible from SCM namespace.
*
* @param blockIDs block IDs. This is often the list of blocks of
* a particular object key.
* @throws IOException if exception happens, non of the blocks is deleted.
*/
@Override
public void deleteBlock(final String key) throws IOException {
public void deleteBlocks(List<String> blockIDs) throws IOException {
if (!nodeManager.isOutOfNodeChillMode()) {
throw new SCMException("Unable to delete block while in chill mode",
CHILL_MODE_EXCEPTION);
}
lock.lock();
LOG.info("Deleting blocks {}", String.join(",", blockIDs));
Map<String, List<String>> containerBlocks = new HashMap<>();
BatchOperation batch = new BatchOperation();
BatchOperation rollbackBatch = new BatchOperation();
// TODO: track the block size info so that we can reclaim the container
// TODO: used space when the block is deleted.
try {
byte[] containerBytes = blockStore.get(DFSUtil.string2Bytes(key));
if (containerBytes == null) {
throw new SCMException("Specified block key does not exist. key : " +
key, FAILED_TO_FIND_BLOCK);
for (String blockKey : blockIDs) {
byte[] blockKeyBytes = DFSUtil.string2Bytes(blockKey);
byte[] containerBytes = blockStore.get(blockKeyBytes);
if (containerBytes == null) {
throw new SCMException(
"Specified block key does not exist. key : " + blockKey,
FAILED_TO_FIND_BLOCK);
}
batch.delete(blockKeyBytes);
rollbackBatch.put(blockKeyBytes, containerBytes);
// Merge blocks to a container to blocks mapping,
// prepare to persist this info to the deletedBlocksLog.
String containerName = DFSUtil.bytes2String(containerBytes);
if (containerBlocks.containsKey(containerName)) {
containerBlocks.get(containerName).add(blockKey);
} else {
List<String> item = new ArrayList<>();
item.add(blockKey);
containerBlocks.put(containerName, item);
}
}
// TODO: track the block size info so that we can reclaim the container
// TODO: used space when the block is deleted.
BatchOperation batch = new BatchOperation();
String deletedKeyName = getDeletedKeyName(key);
// Add a tombstone for the deleted key
batch.put(DFSUtil.string2Bytes(deletedKeyName), containerBytes);
// Delete the block key
batch.delete(DFSUtil.string2Bytes(key));
// We update SCM DB first, so if this step fails, we end up here,
// nothing gets into the delLog so no blocks will be accidentally
// removed. If we write the log first, once log is written, the
// async deleting service will start to scan and might be picking
// up some blocks to do real deletions, that might cause data loss.
blockStore.writeBatch(batch);
// TODO: Add async tombstone clean thread to send delete command to
// datanodes in the pipeline to clean up the blocks from containers.
try {
deletedBlockLog.addTransactions(containerBlocks);
} catch (IOException e) {
try {
// If delLog update is failed, we need to rollback the changes.
blockStore.writeBatch(rollbackBatch);
} catch (IOException rollbackException) {
// This is a corner case. AddTX fails and rollback also fails,
// this will leave these blocks in inconsistent state. They were
// moved to pending deletion state in SCM DB but were not written
// into delLog so real deletions would not be done. Blocks become
// to be invisible from namespace but actual data are not removed.
// We log an error here so admin can manually check and fix such
// errors.
LOG.error("Blocks might be in inconsistent state because"
+ " they were moved to pending deletion state in SCM DB but"
+ " not written into delLog. Admin can manually add them"
+ " into delLog for deletions. Inconsistent block list: {}",
String.join(",", blockIDs), e);
throw rollbackException;
}
throw new IOException("Skip writing the deleted blocks info to"
+ " the delLog because addTransaction fails. Batch skipped: "
+ String.join(",", blockIDs), e);
}
// TODO: Container report handling of the deleted blocks:
// Remove tombstone and update open container usage.
// We will revisit this when the closed container replication is done.

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.ozone.protocol.proto
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* The DeletedBlockLog is a persisted log in SCM to keep tracking
@ -86,6 +87,22 @@ public interface DeletedBlockLog extends Closeable {
void addTransaction(String containerName, List<String> blocks)
throws IOException;
/**
* Creates block deletion transactions for a set of containers,
* add into the log and persist them atomically. An object key
* might be stored in multiple containers and multiple blocks,
* this API ensures that these updates are done in atomic manner
* so if any of them fails, the entire operation fails without
* any updates to the log. Note, this doesn't mean to create only
* one transaction, it creates multiple transactions (depends on the
* number of containers) together (on success) or non (on failure).
*
* @param containerBlocksMap a map of containerBlocks.
* @throws IOException
*/
void addTransactions(Map<String, List<String>> containerBlocksMap)
throws IOException;
/**
* Returns the total number of valid transactions. A transaction is
* considered to be valid as long as its count is in range [0, MAX_RETRY].

View File

@ -37,6 +37,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -209,6 +210,16 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
}
}
private DeletedBlocksTransaction constructNewTransaction(long txID,
String containerName, List<String> blocks) {
return DeletedBlocksTransaction.newBuilder()
.setTxID(txID)
.setContainerName(containerName)
.addAllBlockID(blocks)
.setCount(0)
.build();
}
/**
* {@inheritDoc}
*
@ -244,12 +255,8 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
BatchOperation batch = new BatchOperation();
lock.lock();
try {
DeletedBlocksTransaction tx = DeletedBlocksTransaction.newBuilder()
.setTxID(lastTxID + 1)
.setContainerName(containerName)
.addAllBlockID(blocks)
.setCount(0)
.build();
DeletedBlocksTransaction tx = constructNewTransaction(lastTxID + 1,
containerName, blocks);
byte[] key = Longs.toByteArray(lastTxID + 1);
batch.put(key, tx.toByteArray());
@ -284,6 +291,35 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
}
}
/**
* {@inheritDoc}
*
* @param containerBlocksMap a map of containerBlocks.
* @throws IOException
*/
@Override
public void addTransactions(Map<String, List<String>> containerBlocksMap)
throws IOException {
BatchOperation batch = new BatchOperation();
lock.lock();
try {
long currentLatestID = lastTxID;
for (Map.Entry<String, List<String>> entry :
containerBlocksMap.entrySet()) {
currentLatestID += 1;
byte[] key = Longs.toByteArray(currentLatestID);
DeletedBlocksTransaction tx = constructNewTransaction(currentLatestID,
entry.getKey(), entry.getValue());
batch.put(key, tx.toByteArray());
}
lastTxID = currentLatestID;
batch.put(LATEST_TXID, Longs.toByteArray(lastTxID));
deletedStore.writeBatch(batch);
} finally {
lock.unlock();
}
}
@Override
public void close() throws IOException {
if (deletedStore != null) {

View File

@ -431,6 +431,17 @@
</description>
</property>
<property>
<name>ozone.key.deleting.limit.per.task</name>
<value>1000</value>
<description>
Maximum number of keys to be scanned by key deleting service per
time interval in KSM. Those keys are sent to delete metadata and
generate transactions in SCM for next async deletion between SCM
and DataNode.
</description>
</property>
<property>
<name>dfs.container.ipc</name>
<value>50011</value>

View File

@ -37,6 +37,7 @@ import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.UUID;
import static org.apache.hadoop.ozone.OzoneConsts.GB;
@ -104,7 +105,7 @@ public class TestBlockManager {
public void testDeleteBlock() throws Exception {
AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE);
Assert.assertNotNull(block);
blockManager.deleteBlock(block.getKey());
blockManager.deleteBlocks(Collections.singletonList(block.getKey()));
// Deleted block can not be retrieved
thrown.expectMessage("Specified block key does not exist.");

View File

@ -17,15 +17,31 @@
*/
package org.apache.hadoop.ozone.web.client;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.ksm.KeySpaceManager;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.Status;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
@ -48,8 +64,9 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Random;
import static org.junit.Assert.assertEquals;
@ -77,6 +94,9 @@ public class TestKeys {
public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
// Set short block deleting service interval to speed up deletions.
conf.setInt(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, 1000);
path = GenericTestUtils.getTempPath(TestKeys.class.getSimpleName());
Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);
@ -104,12 +124,12 @@ public class TestKeys {
*
* @return File.
*/
static File createRandomDataFile(String dir, String fileName, long size) {
static File createRandomDataFile(String dir, String fileName, long size)
throws IOException {
File tmpDir = new File(dir);
tmpDir.mkdirs();
FileUtils.forceMkdir(tmpDir);
File tmpFile = new File(tmpDir, fileName);
try {
FileOutputStream randFile = new FileOutputStream(tmpFile);
try (FileOutputStream randFile = new FileOutputStream(tmpFile)) {
Random r = new Random();
for (int x = 0; x < size; x++) {
char c = (char) (r.nextInt(26) + 'a');
@ -176,8 +196,7 @@ public class TestKeys {
* @return Returns the name of the new key that was created.
* @throws OzoneException
*/
private String putKey() throws
OzoneException {
private KsmKeyArgs putKey() throws Exception {
String volumeName = OzoneUtils.getRequestID().toLowerCase();
client.setUserAuth("hdfs");
@ -188,16 +207,21 @@ public class TestKeys {
bucket = vol.createBucket(bucketName, acls, StorageType.DEFAULT);
String fileName = OzoneUtils.getRequestID().toLowerCase();
file = createRandomDataFile(dir, fileName, 1024);
bucket.putKey(keyName, file);
return keyName;
return new KsmKeyArgs.Builder()
.setKeyName(keyName)
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setDataSize(1024)
.build();
}
}
@Test
public void testPutKey() throws OzoneException {
public void testPutKey() throws Exception {
// Test non-delimited keys
runTestPutKey(new PutHelper(ozoneRestClient, path));
// Test key delimited by a random delimiter
@ -206,7 +230,7 @@ public class TestKeys {
getMultiPartKey(delimiter)));
}
static void runTestPutKey(PutHelper helper) throws OzoneException {
static void runTestPutKey(PutHelper helper) throws Exception {
final OzoneRestClient client = helper.client;
helper.putKey();
assertNotNull(helper.getBucket());
@ -254,8 +278,7 @@ public class TestKeys {
}
@Test
public void testPutAndGetKeyWithDnRestart()
throws OzoneException, IOException, URISyntaxException {
public void testPutAndGetKeyWithDnRestart() throws Exception {
runTestPutAndGetKeyWithDnRestart(
new PutHelper(ozoneRestClient, path), ozoneCluster);
String delimiter = RandomStringUtils.randomAscii(1);
@ -265,9 +288,8 @@ public class TestKeys {
}
static void runTestPutAndGetKeyWithDnRestart(
PutHelper helper, MiniOzoneCluster cluster)
throws OzoneException, IOException, URISyntaxException {
String keyName = helper.putKey();
PutHelper helper, MiniOzoneCluster cluster) throws Exception {
String keyName = helper.putKey().getKeyName();
assertNotNull(helper.getBucket());
assertNotNull(helper.getFile());
@ -281,37 +303,35 @@ public class TestKeys {
helper.getBucket().getKey(keyName, newPath);
FileInputStream original = new FileInputStream(helper.getFile());
FileInputStream downloaded = new FileInputStream(newPath.toFile());
String originalHash = DigestUtils.sha256Hex(original);
String downloadedHash = DigestUtils.sha256Hex(downloaded);
assertEquals(
"Sha256 does not match between original file and downloaded file.",
originalHash, downloadedHash);
try (
FileInputStream original = new FileInputStream(helper.getFile());
FileInputStream downloaded = new FileInputStream(newPath.toFile())) {
String originalHash = DigestUtils.sha256Hex(original);
String downloadedHash = DigestUtils.sha256Hex(downloaded);
assertEquals(
"Sha256 does not match between original file and downloaded file.",
originalHash, downloadedHash);
}
}
@Test
public void testPutAndGetKey() throws OzoneException, IOException {
public void testPutAndGetKey() throws Exception {
runTestPutAndGetKey(new PutHelper(ozoneRestClient, path));
String delimiter = RandomStringUtils.randomAscii(1);
runTestPutAndGetKey(new PutHelper(ozoneRestClient, path,
getMultiPartKey(delimiter)));
}
static void runTestPutAndGetKey(PutHelper helper)
throws OzoneException, IOException {
static void runTestPutAndGetKey(PutHelper helper) throws Exception {
final OzoneRestClient client = helper.client;
String keyName = helper.putKey();
String keyName = helper.putKey().getKeyName();
assertNotNull(helper.getBucket());
assertNotNull(helper.getFile());
final String newFileName1 = helper.dir + "/"
final String newFileName1 = helper.dir + "/"
+ OzoneUtils.getRequestID().toLowerCase();
final String newFileName2 = helper.dir + "/"
final String newFileName2 = helper.dir + "/"
+ OzoneUtils.getRequestID().toLowerCase();
Path newPath1 = Paths.get(newFileName1);
@ -322,54 +342,51 @@ public class TestKeys {
client.getKey(helper.getVol().getVolumeName(),
helper.getBucket().getBucketName(), keyName, newPath2);
FileInputStream original = new FileInputStream(helper.getFile());
FileInputStream downloaded1 = new FileInputStream(newPath1.toFile());
FileInputStream downloaded2 = new FileInputStream(newPath1.toFile());
try (FileInputStream original = new FileInputStream(helper.getFile());
FileInputStream downloaded1 = new FileInputStream(newPath1.toFile());
FileInputStream downloaded2 = new FileInputStream(newPath1.toFile())) {
String originalHash = DigestUtils.sha256Hex(original);
String downloadedHash1 = DigestUtils.sha256Hex(downloaded1);
String downloadedHash2 = DigestUtils.sha256Hex(downloaded2);
String originalHash = DigestUtils.sha256Hex(original);
String downloadedHash1 = DigestUtils.sha256Hex(downloaded1);
String downloadedHash2 = DigestUtils.sha256Hex(downloaded2);
assertEquals(
"Sha256 does not match between original file and downloaded file.",
originalHash, downloadedHash1);
assertEquals(
"Sha256 does not match between original file and downloaded file.",
originalHash, downloadedHash2);
assertEquals(
"Sha256 does not match between original file and downloaded file.",
originalHash, downloadedHash1);
assertEquals(
"Sha256 does not match between original file and downloaded file.",
originalHash, downloadedHash2);
// test new get key with invalid volume/bucket name
try {
client.getKey("invalid-volume", helper.getBucket().getBucketName(),
keyName, newPath1);
fail("Get key should have thrown " + "when using invalid volume name.");
} catch (OzoneException e) {
GenericTestUtils
.assertExceptionContains(Status.KEY_NOT_FOUND.toString(), e);
}
// test new get key with invalid volume/bucket name
try {
client.getKey("invalid-volume",
helper.getBucket().getBucketName(), keyName, newPath1);
fail("Get key should have thrown "
+ "when using invalid volume name.");
} catch (OzoneException e) {
GenericTestUtils.assertExceptionContains(
Status.KEY_NOT_FOUND.toString(), e);
}
try {
client.getKey(helper.getVol().getVolumeName(),
"invalid-bucket", keyName, newPath1);
fail("Get key should have thrown "
+ "when using invalid bucket name.");
} catch (OzoneException e) {
GenericTestUtils.assertExceptionContains(
Status.KEY_NOT_FOUND.toString(), e);
try {
client.getKey(helper.getVol().getVolumeName(), "invalid-bucket",
keyName, newPath1);
fail("Get key should have thrown " + "when using invalid bucket name.");
} catch (OzoneException e) {
GenericTestUtils.assertExceptionContains(
Status.KEY_NOT_FOUND.toString(), e);
}
}
}
@Test
public void testPutAndDeleteKey() throws OzoneException, IOException {
public void testPutAndDeleteKey() throws Exception {
runTestPutAndDeleteKey(new PutHelper(ozoneRestClient, path));
String delimiter = RandomStringUtils.randomAscii(1);
runTestPutAndDeleteKey(new PutHelper(ozoneRestClient, path,
getMultiPartKey(delimiter)));
}
static void runTestPutAndDeleteKey(PutHelper helper)
throws OzoneException, IOException {
String keyName = helper.putKey();
static void runTestPutAndDeleteKey(PutHelper helper) throws Exception {
String keyName = helper.putKey().getKeyName();
assertNotNull(helper.getBucket());
assertNotNull(helper.getFile());
helper.getBucket().deleteKey(keyName);
@ -384,16 +401,14 @@ public class TestKeys {
}
@Test
public void testPutAndListKey()
throws OzoneException, IOException, ParseException {
public void testPutAndListKey() throws Exception {
runTestPutAndListKey(new PutHelper(ozoneRestClient, path));
String delimiter = RandomStringUtils.randomAscii(1);
runTestPutAndListKey(new PutHelper(ozoneRestClient, path,
getMultiPartKey(delimiter)));
}
static void runTestPutAndListKey(PutHelper helper)
throws OzoneException, IOException, ParseException {
static void runTestPutAndListKey(PutHelper helper) throws Exception {
final OzoneRestClient client = helper.client;
helper.putKey();
assertNotNull(helper.getBucket());
@ -473,17 +488,15 @@ public class TestKeys {
}
@Test
public void testGetKeyInfo()
throws OzoneException, IOException, ParseException {
public void testGetKeyInfo() throws Exception {
runTestGetKeyInfo(new PutHelper(ozoneRestClient, path));
String delimiter = RandomStringUtils.randomAscii(1);
runTestGetKeyInfo(new PutHelper(ozoneRestClient, path,
getMultiPartKey(delimiter)));
}
static void runTestGetKeyInfo(PutHelper helper)
throws OzoneException, ParseException {
String keyName = helper.putKey();
static void runTestGetKeyInfo(PutHelper helper) throws Exception {
String keyName = helper.putKey().getKeyName();
assertNotNull(helper.getBucket());
assertNotNull(helper.getFile());
@ -500,4 +513,170 @@ public class TestKeys {
(OzoneUtils.formatDate(keyInfo.getObjectInfo().getModifiedOn())
/ 1000) >= (currentTime / 1000));
}
// Volume, bucket, keys info that helps for test create/delete keys.
private static class BucketKeys {
private Map<Pair<String, String>, List<String>> buckets;
BucketKeys() {
buckets = Maps.newHashMap();
}
void addKey(String volume, String bucket, String key) {
// check if this bucket exists
for (Map.Entry<Pair<String, String>, List<String>> entry :
buckets.entrySet()) {
if (entry.getKey().getValue().equals(bucket)) {
entry.getValue().add(key);
return;
}
}
// bucket not exist
Pair<String, String> newBucket = new ImmutablePair(volume, bucket);
List<String> keyList = Lists.newArrayList();
keyList.add(key);
buckets.put(newBucket, keyList);
}
Set<Pair<String, String>> getAllBuckets() {
return buckets.keySet();
}
List<String> getBucketKeys(String bucketName) {
for (Map.Entry<Pair<String, String>, List<String>> entry : buckets
.entrySet()) {
if (entry.getKey().getValue().equals(bucketName)) {
return entry.getValue();
}
}
return Lists.newArrayList();
}
int totalNumOfKeys() {
int count = 0;
for (Map.Entry<Pair<String, String>, List<String>> entry : buckets
.entrySet()) {
count += entry.getValue().size();
}
return count;
}
}
private int countKsmKeys(KeySpaceManager ksm) throws IOException {
int totalCount = 0;
List<KsmVolumeArgs> volumes =
ksm.listAllVolumes(null, null, Integer.MAX_VALUE);
for (KsmVolumeArgs volume : volumes) {
List<KsmBucketInfo> buckets =
ksm.listBuckets(volume.getVolume(), null, null, Integer.MAX_VALUE);
for (KsmBucketInfo bucket : buckets) {
List<KsmKeyInfo> keys = ksm.listKeys(bucket.getVolumeName(),
bucket.getBucketName(), null, null, Integer.MAX_VALUE);
totalCount += keys.size();
}
}
return totalCount;
}
@Test
public void testDeleteKey() throws Exception {
KeySpaceManager ksm = ozoneCluster.getKeySpaceManager();
// To avoid interference from other test cases,
// we collect number of existing keys at the beginning
int numOfExistedKeys = countKsmKeys(ksm);
// Keep tracking bucket keys info while creating them
PutHelper helper = new PutHelper(ozoneRestClient, path);
BucketKeys bucketKeys = new BucketKeys();
for (int i = 0; i < 20; i++) {
KsmKeyArgs keyArgs = helper.putKey();
bucketKeys.addKey(keyArgs.getVolumeName(), keyArgs.getBucketName(),
keyArgs.getKeyName());
}
// There should be 20 keys in the buckets we just created.
Assert.assertEquals(20, bucketKeys.totalNumOfKeys());
int numOfCreatedKeys = 0;
OzoneContainer cm = ozoneCluster.getDataNodes().get(0)
.getOzoneContainerManager();
// Expected to delete chunk file list.
List<File> expectedChunkFiles = Lists.newArrayList();
// Iterate over all buckets, and list all keys in each bucket,
// count the total number of created keys.
Set<Pair<String, String>> buckets = bucketKeys.getAllBuckets();
for (Pair<String, String> buk : buckets) {
List<KsmKeyInfo> createdKeys =
ksm.listKeys(buk.getKey(), buk.getValue(), null, null, 20);
// Memorize chunks that has been created,
// so we can verify actual deletions at DN side later.
for (KsmKeyInfo keyInfo : createdKeys) {
List<KsmKeyLocationInfo> locations = keyInfo.getKeyLocationList();
for (KsmKeyLocationInfo location : locations) {
String containerName = location.getContainerName();
KeyData keyData = new KeyData(containerName, location.getBlockID());
KeyData blockInfo = cm.getContainerManager()
.getKeyManager().getKey(keyData);
ContainerData containerData = cm.getContainerManager()
.readContainer(containerName);
File dataDir = ContainerUtils
.getDataDirectory(containerData).toFile();
for (ContainerProtos.ChunkInfo chunkInfo : blockInfo.getChunks()) {
File chunkFile = dataDir.toPath()
.resolve(chunkInfo.getChunkName()).toFile();
System.out.println("Chunk File created: "
+ chunkFile.getAbsolutePath());
Assert.assertTrue(chunkFile.exists());
expectedChunkFiles.add(chunkFile);
}
}
}
numOfCreatedKeys += createdKeys.size();
}
// Ensure all keys are created.
Assert.assertEquals(20, numOfCreatedKeys);
// Ensure all keys are visible from KSM.
// Total number should be numOfCreated + numOfExisted
Assert.assertEquals(20 + numOfExistedKeys, countKsmKeys(ksm));
// Delete 10 keys
int delCount = 20;
Set<Pair<String, String>> allBuckets = bucketKeys.getAllBuckets();
for (Pair<String, String> bucketInfo : allBuckets) {
List<String> bks = bucketKeys.getBucketKeys(bucketInfo.getValue());
for (String keyName : bks) {
if (delCount > 0) {
KsmKeyArgs arg =
new KsmKeyArgs.Builder().setVolumeName(bucketInfo.getKey())
.setBucketName(bucketInfo.getValue()).setKeyName(keyName)
.build();
ksm.deleteKey(arg);
delCount--;
}
}
}
// It should be pretty quick that keys are removed from KSM namespace,
// because actual deletion happens in async mode.
GenericTestUtils.waitFor(() -> {
try {
int num = countKsmKeys(ksm);
return num == (numOfExistedKeys);
} catch (IOException e) {
return false;
}
}, 1000, 10000);
// It might take a while until all blocks are actually deleted,
// verify all chunk files created earlier are removed from disk.
GenericTestUtils.waitFor(
() -> expectedChunkFiles.stream().allMatch(file -> !file.exists()),
1000, 60000);
}
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.web.client;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.RatisTestHelper;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
@ -28,10 +27,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.io.IOException;
import java.net.URISyntaxException;
import java.text.ParseException;
import static org.apache.hadoop.ozone.web.client.TestKeys.*;
/** The same as {@link TestKeys} except that this test is Ratis enabled. */
@ -59,7 +54,7 @@ public class TestKeysRatis {
}
@Test
public void testPutKey() throws OzoneException {
public void testPutKey() throws Exception {
runTestPutKey(new PutHelper(ozoneRestClient, path));
String delimiter = RandomStringUtils.randomAlphanumeric(1);
runTestPutKey(new PutHelper(ozoneRestClient, path,
@ -67,8 +62,7 @@ public class TestKeysRatis {
}
@Test
public void testPutAndGetKeyWithDnRestart()
throws OzoneException, IOException, URISyntaxException {
public void testPutAndGetKeyWithDnRestart() throws Exception {
runTestPutAndGetKeyWithDnRestart(
new PutHelper(ozoneRestClient, path), suite.getCluster());
String delimiter = RandomStringUtils.randomAlphanumeric(1);
@ -78,7 +72,7 @@ public class TestKeysRatis {
}
@Test
public void testPutAndGetKey() throws OzoneException, IOException {
public void testPutAndGetKey() throws Exception {
runTestPutAndGetKey(new PutHelper(ozoneRestClient, path));
String delimiter = RandomStringUtils.randomAlphanumeric(1);
runTestPutAndGetKey(new PutHelper(ozoneRestClient, path,
@ -86,7 +80,7 @@ public class TestKeysRatis {
}
@Test
public void testPutAndDeleteKey() throws OzoneException, IOException {
public void testPutAndDeleteKey() throws Exception {
runTestPutAndDeleteKey(new PutHelper(ozoneRestClient, path));
String delimiter = RandomStringUtils.randomAlphanumeric(1);
runTestPutAndDeleteKey(new PutHelper(ozoneRestClient, path,
@ -94,8 +88,7 @@ public class TestKeysRatis {
}
@Test
public void testPutAndListKey()
throws OzoneException, IOException, ParseException {
public void testPutAndListKey() throws Exception {
runTestPutAndListKey(new PutHelper(ozoneRestClient, path));
String delimiter = RandomStringUtils.randomAlphanumeric(1);
runTestPutAndListKey(new PutHelper(ozoneRestClient, path,
@ -103,8 +96,7 @@ public class TestKeysRatis {
}
@Test
public void testGetKeyInfo()
throws OzoneException, IOException, ParseException {
public void testGetKeyInfo() throws Exception {
runTestGetKeyInfo(new PutHelper(ozoneRestClient, path));
String delimiter = RandomStringUtils.randomAlphanumeric(1);
runTestGetKeyInfo(new PutHelper(ozoneRestClient, path,