HDFS-11781. Ozone: KSM: Add deleteKey. Contributed by Yuanbo Liu.
This commit is contained in:
parent
c7dd72e2fe
commit
d50c8de46c
|
@ -146,4 +146,12 @@ public interface KeySpaceManagerProtocol {
|
||||||
*/
|
*/
|
||||||
KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException;
|
KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes an existing key.
|
||||||
|
*
|
||||||
|
* @param args the args of the key.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void deleteKey(KsmKeyArgs args) throws IOException;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -445,6 +445,33 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
|
||||||
return KsmKeyInfo.getFromProtobuf(resp.getKeyInfo());
|
return KsmKeyInfo.getFromProtobuf(resp.getKeyInfo());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes an existing key.
|
||||||
|
*
|
||||||
|
* @param args the args of the key.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void deleteKey(KsmKeyArgs args) throws IOException {
|
||||||
|
LocateKeyRequest.Builder req = LocateKeyRequest.newBuilder();
|
||||||
|
KeyArgs keyArgs = KeyArgs.newBuilder()
|
||||||
|
.setVolumeName(args.getVolumeName())
|
||||||
|
.setBucketName(args.getBucketName())
|
||||||
|
.setKeyName(args.getKeyName()).build();
|
||||||
|
req.setKeyArgs(keyArgs);
|
||||||
|
|
||||||
|
final LocateKeyResponse resp;
|
||||||
|
try {
|
||||||
|
resp = rpcProxy.deleteKey(NULL_RPC_CONTROLLER, req.build());
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
if (resp.getStatus() != Status.OK) {
|
||||||
|
throw new IOException("Delete key failed, error:" +
|
||||||
|
resp.getStatus());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the proxy object underlying this protocol translator.
|
* Return the proxy object underlying this protocol translator.
|
||||||
*
|
*
|
||||||
|
|
|
@ -155,7 +155,7 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
|
||||||
@Override
|
@Override
|
||||||
public List<DeleteBlockResult> deleteBlocks(Set<String> keys)
|
public List<DeleteBlockResult> deleteBlocks(Set<String> keys)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Preconditions.checkArgument(keys == null || keys.isEmpty(),
|
Preconditions.checkArgument(keys != null && !keys.isEmpty(),
|
||||||
"keys to be deleted cannot be null or empty");
|
"keys to be deleted cannot be null or empty");
|
||||||
DeleteScmBlocksRequestProto request = DeleteScmBlocksRequestProto
|
DeleteScmBlocksRequestProto request = DeleteScmBlocksRequestProto
|
||||||
.newBuilder()
|
.newBuilder()
|
||||||
|
|
|
@ -218,7 +218,7 @@ message KeyArgs {
|
||||||
required string volumeName = 1;
|
required string volumeName = 1;
|
||||||
required string bucketName = 2;
|
required string bucketName = 2;
|
||||||
required string keyName = 3;
|
required string keyName = 3;
|
||||||
required uint64 dataSize = 4;
|
optional uint64 dataSize = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message KeyInfo {
|
message KeyInfo {
|
||||||
|
@ -317,4 +317,10 @@ service KeySpaceManagerService {
|
||||||
*/
|
*/
|
||||||
rpc lookupKey(LocateKeyRequest)
|
rpc lookupKey(LocateKeyRequest)
|
||||||
returns(LocateKeyResponse);
|
returns(LocateKeyResponse);
|
||||||
|
|
||||||
|
/**
|
||||||
|
Delete an existing key.
|
||||||
|
*/
|
||||||
|
rpc deleteKey(LocateKeyRequest)
|
||||||
|
returns(LocateKeyResponse);
|
||||||
}
|
}
|
|
@ -38,6 +38,7 @@ public class KSMMetrics {
|
||||||
private @Metric MutableCounterLong numBucketModifies;
|
private @Metric MutableCounterLong numBucketModifies;
|
||||||
private @Metric MutableCounterLong numKeyAllocate;
|
private @Metric MutableCounterLong numKeyAllocate;
|
||||||
private @Metric MutableCounterLong numKeyLookup;
|
private @Metric MutableCounterLong numKeyLookup;
|
||||||
|
private @Metric MutableCounterLong numKeyDeletes;
|
||||||
|
|
||||||
// Failure Metrics
|
// Failure Metrics
|
||||||
private @Metric MutableCounterLong numVolumeCreateFails;
|
private @Metric MutableCounterLong numVolumeCreateFails;
|
||||||
|
@ -50,6 +51,7 @@ public class KSMMetrics {
|
||||||
private @Metric MutableCounterLong numBucketModifyFails;
|
private @Metric MutableCounterLong numBucketModifyFails;
|
||||||
private @Metric MutableCounterLong numKeyAllocateFails;
|
private @Metric MutableCounterLong numKeyAllocateFails;
|
||||||
private @Metric MutableCounterLong numKeyLookupFails;
|
private @Metric MutableCounterLong numKeyLookupFails;
|
||||||
|
private @Metric MutableCounterLong numKeyDeleteFails;
|
||||||
|
|
||||||
public KSMMetrics() {
|
public KSMMetrics() {
|
||||||
}
|
}
|
||||||
|
@ -141,6 +143,14 @@ public class KSMMetrics {
|
||||||
numKeyLookupFails.incr();
|
numKeyLookupFails.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incNumKeyDeleteFails() {
|
||||||
|
numKeyDeleteFails.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incNumKeyDeletes() {
|
||||||
|
numKeyDeletes.incr();
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public long getNumVolumeCreates() {
|
public long getNumVolumeCreates() {
|
||||||
return numVolumeCreates.value();
|
return numVolumeCreates.value();
|
||||||
|
@ -240,4 +250,14 @@ public class KSMMetrics {
|
||||||
public long getNumKeyLookupFails() {
|
public long getNumKeyLookupFails() {
|
||||||
return numKeyLookupFails.value();
|
return numKeyLookupFails.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumKeyDeletes() {
|
||||||
|
return numKeyDeletes.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumKeyDeletesFails() {
|
||||||
|
return numKeyDeleteFails.value();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,4 +52,15 @@ public interface KeyManager {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException;
|
KsmKeyInfo lookupKey(KsmKeyArgs args) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes an object by an object key. The key will be immediately removed
|
||||||
|
* from KSM namespace and become invisible to clients. The object data
|
||||||
|
* will be removed in async manner that might retain for some time.
|
||||||
|
*
|
||||||
|
* @param args the args of the key provided by client.
|
||||||
|
* @throws IOException if specified key doesn't exist or
|
||||||
|
* some other I/O errors while deleting an object.
|
||||||
|
*/
|
||||||
|
void deleteKey(KsmKeyArgs args) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,14 +20,19 @@ import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||||
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
||||||
|
import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
|
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
|
||||||
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
|
||||||
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
||||||
import org.iq80.leveldb.DBException;
|
import org.iq80.leveldb.DBException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation of keyManager.
|
* Implementation of keyManager.
|
||||||
|
@ -139,4 +144,40 @@ public class KeyManagerImpl implements KeyManager {
|
||||||
metadataManager.writeLock().unlock();
|
metadataManager.writeLock().unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteKey(KsmKeyArgs args) throws IOException {
|
||||||
|
Preconditions.checkNotNull(args);
|
||||||
|
KsmKeyInfo keyInfo = lookupKey(args);
|
||||||
|
|
||||||
|
metadataManager.writeLock().lock();
|
||||||
|
String volumeName = args.getVolumeName();
|
||||||
|
String bucketName = args.getBucketName();
|
||||||
|
String keyName = args.getKeyName();
|
||||||
|
try {
|
||||||
|
List<DeleteBlockResult> resultList =
|
||||||
|
scmBlockClient.deleteBlocks(
|
||||||
|
Collections.singleton(keyInfo.getBlockID()));
|
||||||
|
if (resultList.size() != 1) {
|
||||||
|
throw new KSMException("Delete result size from SCM is wrong",
|
||||||
|
ResultCodes.FAILED_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (resultList.get(0).getResult() == Result.success) {
|
||||||
|
byte[] objectKey = metadataManager.getDBKeyForKey(
|
||||||
|
volumeName, bucketName, keyName);
|
||||||
|
metadataManager.deleteKey(objectKey);
|
||||||
|
} else {
|
||||||
|
throw new KSMException("Cannot delete key from SCM",
|
||||||
|
ResultCodes.FAILED_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
} catch (DBException ex) {
|
||||||
|
LOG.error(String.format("Delete key failed for volume:%s "
|
||||||
|
+ "bucket:%s key:%s", volumeName, bucketName, keyName), ex);
|
||||||
|
throw new KSMException(ex.getMessage(), ex,
|
||||||
|
ResultCodes.FAILED_INTERNAL_ERROR);
|
||||||
|
} finally {
|
||||||
|
metadataManager.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -460,6 +460,23 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes an existing key.
|
||||||
|
*
|
||||||
|
* @param args - attributes of the key.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void deleteKey(KsmKeyArgs args) throws IOException {
|
||||||
|
try {
|
||||||
|
metrics.incNumKeyDeletes();
|
||||||
|
keyManager.deleteKey(args);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
metrics.incNumKeyDeleteFails();
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets bucket property from args.
|
* Sets bucket property from args.
|
||||||
* @param args - BucketArgs.
|
* @param args - BucketArgs.
|
||||||
|
|
|
@ -105,6 +105,13 @@ public interface MetadataManager {
|
||||||
*/
|
*/
|
||||||
byte[] getDBKeyForKey(String volume, String bucket, String key);
|
byte[] getDBKeyForKey(String volume, String bucket, String key);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes the key from DB.
|
||||||
|
*
|
||||||
|
* @param key - key name
|
||||||
|
*/
|
||||||
|
void deleteKey(byte[] key);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given a volume, check if it is empty, i.e there are no buckets inside it.
|
* Given a volume, check if it is empty, i.e there are no buckets inside it.
|
||||||
* @param volume - Volume name
|
* @param volume - Volume name
|
||||||
|
|
|
@ -114,6 +114,16 @@ public class MetadataManagerImpl implements MetadataManager {
|
||||||
return DFSUtil.string2Bytes(keyKeyString);
|
return DFSUtil.string2Bytes(keyKeyString);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes the key on Metadata DB.
|
||||||
|
*
|
||||||
|
* @param key - key name
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void deleteKey(byte[] key) {
|
||||||
|
store.delete(key);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the read lock used on Metadata DB.
|
* Returns the read lock used on Metadata DB.
|
||||||
* @return readLock
|
* @return readLock
|
||||||
|
|
|
@ -313,4 +313,24 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
|
||||||
}
|
}
|
||||||
return resp.build();
|
return resp.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LocateKeyResponse deleteKey(RpcController controller,
|
||||||
|
LocateKeyRequest request) throws ServiceException {
|
||||||
|
LocateKeyResponse.Builder resp =
|
||||||
|
LocateKeyResponse.newBuilder();
|
||||||
|
try {
|
||||||
|
KeyArgs keyArgs = request.getKeyArgs();
|
||||||
|
KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder()
|
||||||
|
.setVolumeName(keyArgs.getVolumeName())
|
||||||
|
.setBucketName(keyArgs.getBucketName())
|
||||||
|
.setKeyName(keyArgs.getKeyName())
|
||||||
|
.build();
|
||||||
|
impl.deleteKey(ksmKeyArgs);
|
||||||
|
resp.setStatus(Status.OK);
|
||||||
|
} catch (IOException e) {
|
||||||
|
resp.setStatus(exceptionToResponseStatus(e));
|
||||||
|
}
|
||||||
|
return resp.build();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,6 @@ package org.apache.hadoop.ozone.web.handlers;
|
||||||
*/
|
*/
|
||||||
public class KeyArgs extends BucketArgs {
|
public class KeyArgs extends BucketArgs {
|
||||||
private String key;
|
private String key;
|
||||||
private boolean delete;
|
|
||||||
private String hash;
|
private String hash;
|
||||||
private long size;
|
private long size;
|
||||||
|
|
||||||
|
@ -40,6 +39,17 @@ public class KeyArgs extends BucketArgs {
|
||||||
this.key = objectName;
|
this.key = objectName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor for Key Args.
|
||||||
|
*
|
||||||
|
* @param objectName - Key
|
||||||
|
* @param args - Bucket Args
|
||||||
|
*/
|
||||||
|
public KeyArgs(String objectName, BucketArgs args) {
|
||||||
|
super(args);
|
||||||
|
this.key = objectName;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get Key Name.
|
* Get Key Name.
|
||||||
*
|
*
|
||||||
|
@ -49,24 +59,6 @@ public class KeyArgs extends BucketArgs {
|
||||||
return this.key;
|
return this.key;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Checks if this request is for a Delete key.
|
|
||||||
*
|
|
||||||
* @return boolean
|
|
||||||
*/
|
|
||||||
public boolean isDelete() {
|
|
||||||
return delete;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets the key request as a Delete Request.
|
|
||||||
*
|
|
||||||
* @param delete bool, indicating if this is a delete request
|
|
||||||
*/
|
|
||||||
public void setDelete(boolean delete) {
|
|
||||||
this.delete = delete;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Computed File hash.
|
* Computed File hash.
|
||||||
*
|
*
|
||||||
|
|
|
@ -398,7 +398,12 @@ public final class DistributedStorageHandler implements StorageHandler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteKey(KeyArgs args) throws IOException, OzoneException {
|
public void deleteKey(KeyArgs args) throws IOException, OzoneException {
|
||||||
throw new UnsupportedOperationException("deleteKey not implemented");
|
KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
|
||||||
|
.setVolumeName(args.getVolumeName())
|
||||||
|
.setBucketName(args.getBucketName())
|
||||||
|
.setKeyName(args.getKeyName())
|
||||||
|
.build();
|
||||||
|
keySpaceManagerClient.deleteKey(keyArgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -47,9 +47,10 @@ import org.junit.rules.ExpectedException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.List;
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test Key Space Manager operation in distributed handler scenario.
|
* Test Key Space Manager operation in distributed handler scenario.
|
||||||
|
@ -450,4 +451,54 @@ public class TestKeySpaceManager {
|
||||||
Assert.assertEquals(1 + numKeyLookupFails,
|
Assert.assertEquals(1 + numKeyLookupFails,
|
||||||
ksmMetrics.getNumKeyLookupFails());
|
ksmMetrics.getNumKeyLookupFails());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test delete keys for ksm.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* @throws OzoneException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDeleteKey() throws IOException, OzoneException {
|
||||||
|
String userName = "user" + RandomStringUtils.randomNumeric(5);
|
||||||
|
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
|
||||||
|
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
|
||||||
|
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
|
||||||
|
String keyName = "key" + RandomStringUtils.randomNumeric(5);
|
||||||
|
long numKeyDeletes = ksmMetrics.getNumKeyDeletes();
|
||||||
|
long numKeyDeleteFails = ksmMetrics.getNumKeyDeletesFails();
|
||||||
|
|
||||||
|
VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
|
||||||
|
createVolumeArgs.setUserName(userName);
|
||||||
|
createVolumeArgs.setAdminName(adminName);
|
||||||
|
storageHandler.createVolume(createVolumeArgs);
|
||||||
|
|
||||||
|
BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
|
||||||
|
storageHandler.createBucket(bucketArgs);
|
||||||
|
|
||||||
|
KeyArgs keyArgs = new KeyArgs(keyName, bucketArgs);
|
||||||
|
keyArgs.setSize(100);
|
||||||
|
String dataString = RandomStringUtils.randomAscii(100);
|
||||||
|
try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
|
||||||
|
stream.write(dataString.getBytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
storageHandler.deleteKey(keyArgs);
|
||||||
|
Assert.assertEquals(1 + numKeyDeletes, ksmMetrics.getNumKeyDeletes());
|
||||||
|
|
||||||
|
// Check the block key in SCM, make sure it's deleted.
|
||||||
|
Set<String> keys = new HashSet<>();
|
||||||
|
keys.add(keyArgs.getResourceName());
|
||||||
|
exception.expect(IOException.class);
|
||||||
|
exception.expectMessage("Specified block key does not exist");
|
||||||
|
cluster.getStorageContainerManager().getBlockLocations(keys);
|
||||||
|
|
||||||
|
// Delete the key again to test deleting non-existing key.
|
||||||
|
exception.expect(IOException.class);
|
||||||
|
exception.expectMessage("KEY_NOT_FOUND");
|
||||||
|
storageHandler.deleteKey(keyArgs);
|
||||||
|
Assert.assertEquals(1 + numKeyDeleteFails,
|
||||||
|
ksmMetrics.getNumKeyDeletesFails());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue