HDFS-11782. Ozone: KSM: Add listKey. Contributed by Yiqun Lin.

This commit is contained in:
Xiaoyu Yao 2017-06-19 15:55:58 -07:00 committed by Owen O'Malley
parent 721e7a6418
commit e73d285567
14 changed files with 428 additions and 2 deletions

View File

@ -184,4 +184,29 @@ KsmBucketInfo getBucketInfo(String volumeName, String bucketName)
List<KsmBucketInfo> listBuckets(String volumeName,
String startBucketName, String bucketPrefix, int maxNumOfBuckets)
throws IOException;
/**
* Returns a list of keys represented by {@link KsmKeyInfo}
* in the given bucket. Argument volumeName, bucketName is required,
* others are optional.
*
* @param volumeName
* the name of the volume.
* @param bucketName
* the name of the bucket.
* @param startKeyName
* the start key name, only the keys whose name is
* after this value will be included in the result.
* @param keyPrefix
* key name prefix, only the keys whose name has
* this prefix will be included in the result.
* @param maxKeys
* the maximum number of keys to return. It ensures
* the size of the result will not exceed this limit.
* @return a list of keys.
* @throws IOException
*/
List<KsmKeyInfo> listKeys(String volumeName,
String bucketName, String startKeyName, String keyPrefix, int maxKeys)
throws IOException;
}

View File

@ -78,6 +78,8 @@
.KeySpaceManagerProtocolProtos.ListBucketsRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.ListBucketsResponse;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.ListKeysRequest;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.ListKeysResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocol.proto
@ -547,6 +549,45 @@ public void deleteBucket(String volume, String bucket) throws IOException {
}
}
/**
* List keys in a bucket.
*/
@Override
public List<KsmKeyInfo> listKeys(String volumeName, String bucketName,
String startKey, String prefix, int maxKeys) throws IOException {
List<KsmKeyInfo> keys = new ArrayList<>();
ListKeysRequest.Builder reqBuilder = ListKeysRequest.newBuilder();
reqBuilder.setVolumeName(volumeName);
reqBuilder.setBucketName(bucketName);
reqBuilder.setCount(maxKeys);
if (startKey != null) {
reqBuilder.setStartKey(startKey);
}
if (prefix != null) {
reqBuilder.setPrefix(prefix);
}
ListKeysRequest request = reqBuilder.build();
final ListKeysResponse resp;
try {
resp = rpcProxy.listKeys(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
if (resp.getStatus() == Status.OK) {
keys.addAll(
resp.getKeyInfoList().stream()
.map(KsmKeyInfo::getFromProtobuf)
.collect(Collectors.toList()));
return keys;
} else {
throw new IOException("List Keys failed, error: "
+ resp.getStatus());
}
}
/**
* Return the proxy object underlying this protocol translator.

View File

@ -111,6 +111,11 @@ public enum Versioning {NOT_DEFINED, ENABLED, DISABLED}
*/
public static final int MAX_LISTBUCKETS_SIZE = 1024;
/**
* Max number of keys returned per list keys operation.
*/
public static final int MAX_LISTKEYS_SIZE = 1024;
private OzoneConsts() {
// Never Constructed
}

View File

@ -267,6 +267,18 @@ message DeleteBucketResponse {
required Status status = 1;
}
message ListKeysRequest {
required string volumeName = 1;
required string bucketName = 2;
optional string startKey = 3;
optional string prefix = 4;
optional int32 count = 5;
}
message ListKeysResponse {
required Status status = 1;
repeated KeyInfo keyInfo = 2;
}
/**
The KSM service that takes care of Ozone namespace.
@ -355,4 +367,10 @@ service KeySpaceManagerService {
*/
rpc listBuckets(ListBucketsRequest)
returns(ListBucketsResponse);
/**
List Keys.
*/
rpc listKeys(ListKeysRequest)
returns(ListKeysResponse);
}

View File

@ -41,6 +41,7 @@ public class KSMMetrics {
private @Metric MutableCounterLong numKeyLookup;
private @Metric MutableCounterLong numKeyDeletes;
private @Metric MutableCounterLong numBucketLists;
private @Metric MutableCounterLong numKeyLists;
// Failure Metrics
private @Metric MutableCounterLong numVolumeCreateFails;
@ -56,6 +57,7 @@ public class KSMMetrics {
private @Metric MutableCounterLong numKeyLookupFails;
private @Metric MutableCounterLong numKeyDeleteFails;
private @Metric MutableCounterLong numBucketListFails;
private @Metric MutableCounterLong numKeyListFails;
public KSMMetrics() {
}
@ -107,6 +109,10 @@ public void incNumBucketLists() {
numBucketLists.incr();
}
public void incNumKeyLists() {
numKeyLists.incr();
}
public void incNumVolumeCreateFails() {
numVolumeCreateFails.incr();
}
@ -171,6 +177,10 @@ public void incNumBucketListFails() {
numBucketListFails.incr();
}
public void incNumKeyListFails() {
numKeyListFails.incr();
}
@VisibleForTesting
public long getNumVolumeCreates() {
return numVolumeCreates.value();
@ -221,6 +231,11 @@ public long getNumBucketLists() {
return numBucketLists.value();
}
@VisibleForTesting
public long getNumKeyLists() {
return numKeyLists.value();
}
@VisibleForTesting
public long getNumVolumeCreateFails() {
return numVolumeCreateFails.value();
@ -300,4 +315,9 @@ public long getNumKeyDeletesFails() {
public long getNumBucketListFails() {
return numBucketListFails.value();
}
@VisibleForTesting
public long getNumKeyListFails() {
return numKeyListFails.value();
}
}

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
import java.io.IOException;
import java.util.List;
/**
* Handles key level commands.
@ -63,4 +64,28 @@ public interface KeyManager {
* some other I/O errors while deleting an object.
*/
void deleteKey(KsmKeyArgs args) throws IOException;
/**
* Returns a list of keys represented by {@link KsmKeyInfo}
* in the given bucket.
*
* @param volumeName
* the name of the volume.
* @param bucketName
* the name of the bucket.
* @param startKey
* the start key name, only the keys whose name is
* after this value will be included in the result.
* @param keyPrefix
* key name prefix, only the keys whose name has
* this prefix will be included in the result.
* @param maxKeys
* the maximum number of keys to return. It ensures
* the size of the result will not exceed this limit.
* @return a list of keys.
* @throws IOException
*/
List<KsmKeyInfo> listKeys(String volumeName,
String bucketName, String startKey, String keyPrefix, int maxKeys)
throws IOException;
}

View File

@ -180,4 +180,19 @@ public void deleteKey(KsmKeyArgs args) throws IOException {
metadataManager.writeLock().unlock();
}
}
@Override
public List<KsmKeyInfo> listKeys(String volumeName, String bucketName,
String startKey, String keyPrefix, int maxKeys) throws IOException {
Preconditions.checkNotNull(volumeName);
Preconditions.checkNotNull(bucketName);
metadataManager.readLock().lock();
try {
return metadataManager.listKeys(volumeName, bucketName,
startKey, keyPrefix, maxKeys);
} finally {
metadataManager.readLock().unlock();
}
}
}

View File

@ -473,6 +473,19 @@ public void deleteKey(KsmKeyArgs args) throws IOException {
}
}
@Override
public List<KsmKeyInfo> listKeys(String volumeName, String bucketName,
String startKey, String keyPrefix, int maxKeys) throws IOException {
try {
metrics.incNumKeyLists();
return keyManager.listKeys(volumeName, bucketName,
startKey, keyPrefix, maxKeys);
} catch (IOException ex) {
metrics.incNumKeyListFails();
throw ex;
}
}
/**
* Sets bucket property from args.
* @param args - BucketArgs.

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.ksm;
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
import java.io.IOException;
import java.util.List;
@ -157,4 +158,28 @@ void batchPutDelete(List<Map.Entry<byte[], byte[]>> putList,
*/
List<KsmBucketInfo> listBuckets(String volumeName, String startBucket,
String bucketPrefix, int maxNumOfBuckets) throws IOException;
/**
* Returns a list of keys represented by {@link KsmKeyInfo}
* in the given bucket.
*
* @param volumeName
* the name of the volume.
* @param bucketName
* the name of the bucket.
* @param startKey
* the start key name, only the keys whose name is
* after this value will be included in the result.
* @param keyPrefix
* key name prefix, only the keys whose name has
* this prefix will be included in the result.
* @param maxKeys
* the maximum number of keys to return. It ensures
* the size of the result will not exceed this limit.
* @return a list of keys.
* @throws IOException
*/
List<KsmKeyInfo> listKeys(String volumeName,
String bucketName, String startKey, String keyPrefix, int maxKeys)
throws IOException;
}

View File

@ -19,11 +19,13 @@
import com.google.common.base.Strings;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.BucketInfo;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.LevelDBKeyFilters.LevelDBKeyFilter;
@ -125,6 +127,13 @@ private String getBucketKeyPrefix(String volume, String bucket) {
return sb.toString();
}
private String getKeyKeyPrefix(String volume, String bucket, String key) {
String keyStr = getBucketKeyPrefix(volume, bucket);
keyStr = Strings.isNullOrEmpty(key) ? keyStr + OzoneConsts.KSM_KEY_PREFIX
: keyStr + OzoneConsts.KSM_KEY_PREFIX + key;
return keyStr;
}
@Override
public byte[] getDBKeyForKey(String volume, String bucket, String key) {
String keyKeyString = OzoneConsts.KSM_VOLUME_PREFIX + volume
@ -306,4 +315,40 @@ public List<KsmBucketInfo> listBuckets(final String volumeName,
}
return result;
}
@Override
public List<KsmKeyInfo> listKeys(String volumeName, String bucketName,
String startKey, String keyPrefix, int maxKeys) throws IOException {
List<KsmKeyInfo> result = new ArrayList<>();
if (Strings.isNullOrEmpty(volumeName)) {
throw new KSMException("Volume name is required.",
ResultCodes.FAILED_VOLUME_NOT_FOUND);
}
if (Strings.isNullOrEmpty(bucketName)) {
throw new KSMException("Bucket name is required.",
ResultCodes.FAILED_BUCKET_NOT_FOUND);
}
byte[] bucketNameBytes = getBucketKey(volumeName, bucketName);
if (store.get(bucketNameBytes) == null) {
throw new KSMException("Bucket " + bucketName + " not found.",
ResultCodes.FAILED_BUCKET_NOT_FOUND);
}
byte[] startKeyBytes = null;
if (!Strings.isNullOrEmpty(startKey)) {
startKeyBytes = getDBKeyForKey(volumeName, bucketName, startKey);
}
LevelDBKeyFilter filter =
new KeyPrefixFilter(getKeyKeyPrefix(volumeName, bucketName, keyPrefix));
List<Map.Entry<byte[], byte[]>> rangeResult =
store.getRangeKVs(startKeyBytes, maxKeys, filter);
for (Map.Entry<byte[], byte[]> entry : rangeResult) {
KsmKeyInfo info = KsmKeyInfo.getFromProtobuf(
KeyInfo.parseFrom(entry.getValue()));
result.add(info);
}
return result;
}
}

View File

@ -76,7 +76,8 @@
.KeySpaceManagerProtocolProtos.ListBucketsRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.ListBucketsResponse;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.ListKeysRequest;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.ListKeysResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.Status;
@ -381,4 +382,26 @@ public ListBucketsResponse listBuckets(
}
return resp.build();
}
@Override
public ListKeysResponse listKeys(RpcController controller,
ListKeysRequest request) throws ServiceException {
ListKeysResponse.Builder resp =
ListKeysResponse.newBuilder();
try {
List<KsmKeyInfo> keys = impl.listKeys(
request.getVolumeName(),
request.getBucketName(),
request.getStartKey(),
request.getPrefix(),
request.getCount());
for(KsmKeyInfo key : keys) {
resp.addKeyInfo(key.getProtobuf());
}
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
}

View File

@ -190,6 +190,14 @@ public void sort() {
Collections.sort(keyList);
}
/**
* Add a new key to the list of keys.
* @param keyInfo - key Info
*/
public void addKey(KeyInfo keyInfo){
this.keyList.add(keyInfo);
}
/**
* This class allows us to create custom filters for the Json serialization.
*/

View File

@ -58,6 +58,7 @@
import org.apache.hadoop.ozone.web.response.VolumeOwner;
import org.apache.hadoop.ozone.web.response.ListBuckets;
import org.apache.hadoop.ozone.web.response.BucketInfo;
import org.apache.hadoop.ozone.web.response.KeyInfo;
import org.apache.hadoop.ozone.web.response.ListKeys;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.storage.ChunkInputStream;
@ -433,7 +434,49 @@ public void deleteKey(KeyArgs args) throws IOException, OzoneException {
@Override
public ListKeys listKeys(ListArgs args) throws IOException, OzoneException {
throw new UnsupportedOperationException("listKeys not implemented");
ListKeys result = new ListKeys();
UserArgs userArgs = args.getArgs();
if (userArgs instanceof BucketArgs) {
BucketArgs bucketArgs = (BucketArgs) userArgs;
if (Strings.isNullOrEmpty(bucketArgs.getVolumeName())) {
throw new IllegalArgumentException("Illegal argument,"
+ " volume name cannot be null or empty.");
}
if (Strings.isNullOrEmpty(bucketArgs.getBucketName())) {
throw new IllegalArgumentException("Illegal argument,"
+ " bucket name cannot be null or empty.");
}
int maxNumOfKeys = args.getMaxKeys();
if (maxNumOfKeys <= 0 ||
maxNumOfKeys > OzoneConsts.MAX_LISTKEYS_SIZE) {
throw new IllegalArgumentException(
String.format("Illegal max number of keys specified,"
+ " the value must be in range (0, %d], actual : %d.",
OzoneConsts.MAX_LISTKEYS_SIZE, maxNumOfKeys));
}
List<KsmKeyInfo> keys=
keySpaceManagerClient.listKeys(bucketArgs.getVolumeName(),
bucketArgs.getBucketName(),
args.getPrevKey(), args.getPrefix(), args.getMaxKeys());
// Convert the result for the web layer.
for (KsmKeyInfo info : keys) {
KeyInfo tempInfo = new KeyInfo();
tempInfo.setVersion(0);
tempInfo.setKeyName(info.getKeyName());
tempInfo.setSize(info.getDataSize());
result.addKey(tempInfo);
}
return result;
} else {
throw new IllegalArgumentException("Illegal argument provided,"
+ " expecting BucketArgs type but met "
+ userArgs.getClass().getSimpleName());
}
}
private XceiverClientSpi getContainer(String containerName)

View File

@ -34,12 +34,15 @@
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.web.request.OzoneQuota;
import org.apache.hadoop.ozone.web.response.BucketInfo;
import org.apache.hadoop.ozone.web.response.KeyInfo;
import org.apache.hadoop.ozone.web.response.VolumeInfo;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.Status;
import org.apache.hadoop.ozone.web.handlers.ListArgs;
import org.apache.hadoop.ozone.web.response.ListBuckets;
import org.apache.hadoop.ozone.web.response.ListKeys;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@ -720,4 +723,121 @@ public void testListBuckets() throws IOException, OzoneException {
.contains(Status.VOLUME_NOT_FOUND.name()));
}
}
/**
* Test list keys.
* @throws IOException
* @throws OzoneException
*/
@Test
public void testListKeys() throws IOException, OzoneException {
ListKeys result = null;
ListArgs listKeyArgs = null;
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
createVolumeArgs.setUserName(userName);
createVolumeArgs.setAdminName(adminName);
storageHandler.createVolume(createVolumeArgs);
BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
bucketArgs.setAddAcls(new LinkedList<>());
bucketArgs.setRemoveAcls(new LinkedList<>());
bucketArgs.setStorageType(StorageType.DISK);
storageHandler.createBucket(bucketArgs);
// Write 20 keys in bucket.
int numKeys = 20;
String keyName = "Key";
KeyArgs keyArgs = null;
for (int i = 0; i < numKeys; i++) {
if (i % 2 == 0) {
// Create /volume/bucket/aKey[0,2,4,...,18] in bucket.
keyArgs = new KeyArgs("a" + keyName + i, bucketArgs);
} else {
// Create /volume/bucket/bKey[1,3,5,...,19] in bucket.
keyArgs = new KeyArgs("b" + keyName + i, bucketArgs);
}
keyArgs.setSize(4096);
// Just for testing list keys call, so no need to write real data.
OutputStream stream = storageHandler.newKeyWriter(keyArgs);
stream.close();
}
// List all keys in bucket.
bucketArgs = new BucketArgs(volumeName, bucketName, userArgs);
listKeyArgs = new ListArgs(bucketArgs, null, 100, null);
result = storageHandler.listKeys(listKeyArgs);
Assert.assertEquals(numKeys, result.getKeyList().size());
List<KeyInfo> allKeys = result.getKeyList().stream()
.filter(item -> item.getSize() == 4096)
.collect(Collectors.toList());
// List keys with prefix "aKey".
listKeyArgs = new ListArgs(bucketArgs, "aKey", 100, null);
result = storageHandler.listKeys(listKeyArgs);
Assert.assertEquals(numKeys / 2, result.getKeyList().size());
Assert.assertTrue(result.getKeyList().stream()
.allMatch(entry -> entry.getKeyName().startsWith("aKey")));
// List a certain number of keys.
listKeyArgs = new ListArgs(bucketArgs, null, 3, null);
result = storageHandler.listKeys(listKeyArgs);
Assert.assertEquals(3, result.getKeyList().size());
Assert.assertEquals("aKey0",
result.getKeyList().get(0).getKeyName());
Assert.assertEquals("aKey10",
result.getKeyList().get(1).getKeyName());
Assert.assertEquals("aKey12",
result.getKeyList().get(2).getKeyName());
// List a certain number of keys from the startKey.
listKeyArgs = new ListArgs(bucketArgs, null, 2, "bKey1");
result = storageHandler.listKeys(listKeyArgs);
Assert.assertEquals(2, result.getKeyList().size());
Assert.assertEquals("bKey1",
result.getKeyList().get(0).getKeyName());
Assert.assertEquals("bKey11",
result.getKeyList().get(1).getKeyName());
// Provide an invalid key name as start key.
listKeyArgs = new ListArgs(bucketArgs, null, 100, "invalid_start_key");
try {
storageHandler.listKeys(listKeyArgs);
Assert.fail("Expecting an error when the given start"
+ " key name is invalid.");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains(
Status.INTERNAL_ERROR.name(), e);
}
// Provide an invalid maxKeys argument.
try {
listKeyArgs = new ListArgs(bucketArgs, null, -1, null);
storageHandler.listBuckets(listKeyArgs);
Assert.fail("Expecting an error when the given"
+ " maxKeys argument is invalid.");
} catch (Exception e) {
GenericTestUtils.assertExceptionContains(
String.format("the value must be in range (0, %d]",
OzoneConsts.MAX_LISTKEYS_SIZE), e);
}
// Provide an invalid bucket name.
bucketArgs = new BucketArgs("invalid_bucket", createVolumeArgs);
try {
listKeyArgs = new ListArgs(bucketArgs, null, numKeys, null);
storageHandler.listKeys(listKeyArgs);
Assert.fail(
"Expecting an error when the given bucket name is invalid.");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains(
Status.BUCKET_NOT_FOUND.name(), e);
}
}
}