HDFS-11779. Ozone: KSM: add listBuckets. Contributed by Weiwei Yang.

This commit is contained in:
Weiwei Yang 2017-06-08 17:38:48 +08:00
parent da8bc385a7
commit 23d7d613df
14 changed files with 438 additions and 3 deletions

View File

@ -161,4 +161,27 @@ public interface KeySpaceManagerProtocol {
* @throws IOException
*/
void deleteBucket(String volume, String bucket) throws IOException;
/**
* Returns a list of buckets represented by {@link KsmBucketInfo}
* in the given volume. Argument volumeName is required, others
* are optional.
*
* @param volumeName
* the name of the volume.
* @param startBucketName
* the start bucket name, only the buckets whose name is
* after this value will be included in the result.
* @param bucketPrefix
* bucket name prefix, only the buckets whose name has
* this prefix will be included in the result.
* @param maxNumOfBuckets
* the maximum number of buckets to return. It ensures
* the size of the result will not exceed this limit.
* @return a list of buckets.
* @throws IOException
*/
List<KsmBucketInfo> listBuckets(String volumeName,
String startBucketName, String bucketPrefix, int maxNumOfBuckets)
throws IOException;
}

View File

@ -74,6 +74,10 @@ import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CheckVolumeAccessRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CheckVolumeAccessResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.ListBucketsRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.ListBucketsResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocol.proto
@ -84,6 +88,8 @@ import org.apache.hadoop.ozone.protocol.proto
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.stream.Collectors;
/**
* The client side implementation of KeySpaceManagerProtocol.
@ -396,6 +402,49 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
}
}
/**
* List buckets in a volume.
*
* @param volumeName
* @param startKey
* @param prefix
* @param count
* @return
* @throws IOException
*/
@Override
public List<KsmBucketInfo> listBuckets(String volumeName,
String startKey, String prefix, int count) throws IOException {
List<KsmBucketInfo> buckets = new ArrayList<>();
ListBucketsRequest.Builder reqBuilder = ListBucketsRequest.newBuilder();
reqBuilder.setVolumeName(volumeName);
reqBuilder.setCount(count);
if (startKey != null) {
reqBuilder.setStartKey(startKey);
}
if (prefix != null) {
reqBuilder.setPrefix(prefix);
}
ListBucketsRequest request = reqBuilder.build();
final ListBucketsResponse resp;
try {
resp = rpcProxy.listBuckets(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
if (resp.getStatus() == Status.OK) {
buckets.addAll(
resp.getBucketInfoList().stream()
.map(KsmBucketInfo::getFromProtobuf)
.collect(Collectors.toList()));
return buckets;
} else {
throw new IOException("List Buckets failed, error: "
+ resp.getStatus());
}
}
/**
* Allocate a block for a key, then use the returned meta info to talk to data
* node to actually write the key.

View File

@ -106,6 +106,11 @@ public final class OzoneConsts {
*/
public static final long MAX_QUOTA_IN_BYTES = 1024L * 1024 * TB;
/**
* Max number of keys returned per list buckets operation.
*/
public static final int MAX_LISTBUCKETS_SIZE = 1024;
private OzoneConsts() {
// Never Constructed
}

View File

@ -210,9 +210,19 @@ message InfoBucketRequest {
message InfoBucketResponse {
required Status status = 1;
optional BucketInfo bucketInfo = 2;
}
message ListBucketsRequest {
required string volumeName = 1;
optional string startKey = 2;
optional string prefix = 3;
optional int32 count = 4;
}
message ListBucketsResponse {
required Status status = 1;
repeated BucketInfo bucketInfo = 2;
}
message KeyArgs {
required string volumeName = 1;
@ -339,4 +349,10 @@ service KeySpaceManagerService {
*/
rpc deleteBucket(DeleteBucketRequest)
returns (DeleteBucketResponse);
/**
List Buckets.
*/
rpc listBuckets(ListBucketsRequest)
returns(ListBucketsResponse);
}

View File

@ -20,6 +20,7 @@ import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
import java.io.IOException;
import java.util.List;
/**
* BucketManager handles all the bucket level operations.
@ -52,4 +53,27 @@ public interface BucketManager {
* @throws IOException
*/
void deleteBucket(String volumeName, String bucketName) throws IOException;
/**
* Returns a list of buckets represented by {@link KsmBucketInfo}
* in the given volume.
*
* @param volumeName
* Required parameter volume name determines buckets in which volume
* to return.
* @param startBucket
* Optional start bucket name parameter indicating where to start
* the bucket listing from.
* @param bucketPrefix
* Optional start key parameter, restricting the response to buckets
* that begin with the specified name.
* @param maxNumOfBuckets
* The maximum number of buckets to return. It ensures
* the size of the result will not exceed this limit.
* @return a list of buckets.
* @throws IOException
*/
List<KsmBucketInfo> listBuckets(String volumeName,
String startBucket, String bucketPrefix, int maxNumOfBuckets)
throws IOException;
}

View File

@ -277,4 +277,20 @@ public class BucketManagerImpl implements BucketManager {
}
}
/**
* {@inheritDoc}
*/
@Override
public List<KsmBucketInfo> listBuckets(String volumeName,
String startBucket, String bucketPrefix, int maxNumOfBuckets)
throws IOException {
Preconditions.checkNotNull(volumeName);
metadataManager.readLock().lock();
try {
return metadataManager.listBuckets(
volumeName, startBucket, bucketPrefix, maxNumOfBuckets);
} finally {
metadataManager.readLock().unlock();
}
}
}

View File

@ -40,6 +40,7 @@ public class KSMMetrics {
private @Metric MutableCounterLong numKeyAllocate;
private @Metric MutableCounterLong numKeyLookup;
private @Metric MutableCounterLong numKeyDeletes;
private @Metric MutableCounterLong numBucketLists;
// Failure Metrics
private @Metric MutableCounterLong numVolumeCreateFails;
@ -54,6 +55,7 @@ public class KSMMetrics {
private @Metric MutableCounterLong numKeyAllocateFails;
private @Metric MutableCounterLong numKeyLookupFails;
private @Metric MutableCounterLong numKeyDeleteFails;
private @Metric MutableCounterLong numBucketListFails;
public KSMMetrics() {
}
@ -101,6 +103,10 @@ public class KSMMetrics {
numBucketDeletes.incr();
}
public void incNumBucketLists() {
numBucketLists.incr();
}
public void incNumVolumeCreateFails() {
numVolumeCreateFails.incr();
}
@ -161,6 +167,10 @@ public class KSMMetrics {
numKeyDeletes.incr();
}
public void incNumBucketListFails() {
numBucketListFails.incr();
}
@VisibleForTesting
public long getNumVolumeCreates() {
return numVolumeCreates.value();
@ -206,6 +216,11 @@ public class KSMMetrics {
return numBucketDeletes.value();
}
@VisibleForTesting
public long getNumBucketLists() {
return numBucketLists.value();
}
@VisibleForTesting
public long getNumVolumeCreateFails() {
return numVolumeCreateFails.value();
@ -280,4 +295,9 @@ public class KSMMetrics {
public long getNumKeyDeletesFails() {
return numKeyDeleteFails.value();
}
@VisibleForTesting
public long getNumBucketListFails() {
return numBucketListFails.value();
}
}

View File

@ -383,6 +383,23 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
}
}
/**
* {@inheritDoc}
*/
@Override
public List<KsmBucketInfo> listBuckets(String volumeName,
String startKey, String prefix, int maxNumOfBuckets)
throws IOException {
try {
metrics.incNumBucketLists();
return bucketManager.listBuckets(volumeName,
startKey, prefix, maxNumOfBuckets);
} catch (IOException ex) {
metrics.incNumBucketListFails();
throw ex;
}
}
/**
* Gets the bucket information.
*

View File

@ -16,6 +16,8 @@
*/
package org.apache.hadoop.ozone.ksm;
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@ -133,4 +135,26 @@ public interface MetadataManager {
* @return true if the bucket is empty
*/
boolean isBucketEmpty(String volume, String bucket) throws IOException;
/**
* Returns a list of buckets represented by {@link KsmBucketInfo}
* in the given volume.
*
* @param volumeName
* the name of the volume. This argument is required,
* this method returns buckets in this given volume.
* @param startBucket
* the start bucket name. Only the buckets whose name is
* after this value will be included in the result.
* @param bucketPrefix
* bucket name prefix. Only the buckets whose name has
* this prefix will be included in the result.
* @param maxNumOfBuckets
* the maximum number of buckets to return. It ensures
* the size of the result will not exceed this limit.
* @return a list of buckets.
* @throws IOException
*/
List<KsmBucketInfo> listBuckets(String volumeName, String startBucket,
String bucketPrefix, int maxNumOfBuckets) throws IOException;
}

View File

@ -16,10 +16,17 @@
*/
package org.apache.hadoop.ozone.ksm;
import com.google.common.base.Strings;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.BucketInfo;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.LevelDBKeyFilters.LevelDBKeyFilter;
import org.apache.hadoop.utils.LevelDBStore;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
@ -28,6 +35,7 @@ import org.iq80.leveldb.WriteBatch;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -106,6 +114,17 @@ public class MetadataManagerImpl implements MetadataManager {
return DFSUtil.string2Bytes(bucketKeyString);
}
private String getBucketKeyPrefix(String volume, String bucket) {
StringBuffer sb = new StringBuffer();
sb.append(OzoneConsts.KSM_VOLUME_PREFIX)
.append(volume)
.append(OzoneConsts.KSM_BUCKET_PREFIX);
if (!Strings.isNullOrEmpty(bucket)) {
sb.append(bucket);
}
return sb.toString();
}
@Override
public byte[] getDBKeyForKey(String volume, String bucket, String key) {
String keyKeyString = OzoneConsts.KSM_VOLUME_PREFIX + volume
@ -252,4 +271,39 @@ public class MetadataManagerImpl implements MetadataManager {
return true;
}
}
/**
* {@inheritDoc}
*/
@Override
public List<KsmBucketInfo> listBuckets(final String volumeName,
final String startBucket, final String bucketPrefix,
final int maxNumOfBuckets) throws IOException {
List<KsmBucketInfo> result = new ArrayList<>();
if (Strings.isNullOrEmpty(volumeName)) {
throw new KSMException("Volume name is required.",
ResultCodes.FAILED_VOLUME_NOT_FOUND);
}
byte[] volumeNameBytes = getVolumeKey(volumeName);
if (store.get(volumeNameBytes) == null) {
throw new KSMException("Volume " + volumeName + " not found.",
ResultCodes.FAILED_VOLUME_NOT_FOUND);
}
byte[] startKeyBytes = null;
if (!Strings.isNullOrEmpty(startBucket)) {
startKeyBytes = getBucketKey(volumeName, startBucket);
}
LevelDBKeyFilter filter =
new KeyPrefixFilter(getBucketKeyPrefix(volumeName, bucketPrefix));
List<Map.Entry<byte[], byte[]>> rangeResult =
store.getRangeKVs(startKeyBytes, maxNumOfBuckets, filter);
for (Map.Entry<byte[], byte[]> entry : rangeResult) {
KsmBucketInfo info = KsmBucketInfo.getFromProtobuf(
BucketInfo.parseFrom(entry.getValue()));
result.add(info);
}
return result;
}
}

View File

@ -72,11 +72,17 @@ import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.ListVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.ListVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.ListBucketsRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.ListBucketsResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.Status;
import java.io.IOException;
import java.util.List;
/**
* This class is the server-side translator that forwards requests received on
@ -353,4 +359,26 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
}
return resp.build();
}
@Override
public ListBucketsResponse listBuckets(
RpcController controller, ListBucketsRequest request)
throws ServiceException {
ListBucketsResponse.Builder resp =
ListBucketsResponse.newBuilder();
try {
List<KsmBucketInfo> buckets = impl.listBuckets(
request.getVolumeName(),
request.getStartKey(),
request.getPrefix(),
request.getCount());
for(KsmBucketInfo bucket : buckets) {
resp.addBucketInfo(bucket.getProtobuf());
}
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.web.storage;
import com.google.common.base.Strings;
import org.apache.hadoop.hdfs.ozone.protocol.proto
.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdfs.ozone.protocol.proto
@ -52,6 +53,7 @@ import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
import org.apache.hadoop.ozone.web.handlers.ListArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.handlers.UserArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.response.ListVolumes;
import org.apache.hadoop.ozone.web.response.VolumeInfo;
@ -293,7 +295,43 @@ public final class DistributedStorageHandler implements StorageHandler {
@Override
public ListBuckets listBuckets(ListArgs args)
throws IOException, OzoneException {
throw new UnsupportedOperationException("listBuckets not implemented");
ListBuckets result = new ListBuckets();
UserArgs userArgs = args.getArgs();
if (userArgs instanceof VolumeArgs) {
VolumeArgs va = (VolumeArgs) userArgs;
if (Strings.isNullOrEmpty(va.getVolumeName())) {
throw new IllegalArgumentException("Illegal argument,"
+ " volume name cannot be null or empty.");
}
int maxNumOfKeys = args.getMaxKeys();
if (maxNumOfKeys <= 0 ||
maxNumOfKeys > OzoneConsts.MAX_LISTBUCKETS_SIZE) {
throw new IllegalArgumentException(
String.format("Illegal max number of keys specified,"
+ " the value must be in range (0, %d], actual : %d.",
OzoneConsts.MAX_LISTBUCKETS_SIZE, maxNumOfKeys));
}
List<KsmBucketInfo> buckets =
keySpaceManagerClient.listBuckets(va.getVolumeName(),
args.getPrevKey(), args.getPrefix(), args.getMaxKeys());
// Convert the result for the web layer.
for (KsmBucketInfo bucketInfo : buckets) {
BucketInfo bk = new BucketInfo();
bk.setVolumeName(bucketInfo.getVolumeName());
bk.setBucketName(bucketInfo.getBucketName());
bk.setStorageType(bucketInfo.getStorageType());
bk.setAcls(bucketInfo.getAcls());
result.addBucket(bk);
}
return result;
} else {
throw new IllegalArgumentException("Illegal argument provided,"
+ " expecting VolumeArgs type but met "
+ userArgs.getClass().getSimpleName());
}
}
@Override

View File

@ -266,7 +266,6 @@ public class LevelDBStore implements Closeable {
snapShot = db.getSnapshot();
ReadOptions readOptions = new ReadOptions().snapshot(snapShot);
dbIter = db.iterator(readOptions);
dbIter.seekToFirst();
if (startKey == null) {
dbIter.seekToFirst();
} else {

View File

@ -36,6 +36,10 @@ import org.apache.hadoop.ozone.web.request.OzoneQuota;
import org.apache.hadoop.ozone.web.response.BucketInfo;
import org.apache.hadoop.ozone.web.response.VolumeInfo;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.Status;
import org.apache.hadoop.ozone.web.handlers.ListArgs;
import org.apache.hadoop.ozone.web.response.ListBuckets;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@ -51,6 +55,8 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.Random;
import java.util.Set;
import java.util.List;
import java.util.stream.Collectors;
/**
* Test Key Space Manager operation in distributed handler scenario.
@ -598,4 +604,120 @@ public class TestKeySpaceManager {
ksmMetrics.getNumKeyDeletesFails());
}
@Test(timeout = 60000)
public void testListBuckets() throws IOException, OzoneException {
ListBuckets result = null;
ListArgs listBucketArgs = null;
// Create volume - volA.
final String volAname = "volA";
VolumeArgs volAArgs = new VolumeArgs(volAname, userArgs);
volAArgs.setUserName("userA");
volAArgs.setAdminName("adminA");
storageHandler.createVolume(volAArgs);
// Create 20 buckets in volA for tests.
for (int i=0; i<10; i++) {
// Create "/volA/aBucket_0" to "/volA/aBucket_9" buckets in volA volume.
BucketArgs aBuckets = new BucketArgs(volAname,
"aBucket_" + i, userArgs);
if(i % 3 == 0) {
aBuckets.setStorageType(StorageType.ARCHIVE);
} else {
aBuckets.setStorageType(StorageType.DISK);
}
storageHandler.createBucket(aBuckets);
// Create "/volA/bBucket_0" to "/volA/bBucket_9" buckets in volA volume.
BucketArgs bBuckets = new BucketArgs(volAname,
"bBucket_" + i, userArgs);
if(i % 3 == 0) {
bBuckets.setStorageType(StorageType.RAM_DISK);
} else {
bBuckets.setStorageType(StorageType.SSD);
}
storageHandler.createBucket(bBuckets);
}
VolumeArgs volArgs = new VolumeArgs(volAname, userArgs);
// List all buckets in volA.
listBucketArgs = new ListArgs(volArgs, null, 100, null);
result = storageHandler.listBuckets(listBucketArgs);
Assert.assertEquals(20, result.getBuckets().size());
List<BucketInfo> archiveBuckets = result.getBuckets().stream()
.filter(item -> item.getStorageType() == StorageType.ARCHIVE)
.collect(Collectors.toList());
Assert.assertEquals(4, archiveBuckets.size());
// List buckets with prefix "aBucket".
listBucketArgs = new ListArgs(volArgs, "aBucket", 100, null);
result = storageHandler.listBuckets(listBucketArgs);
Assert.assertEquals(10, result.getBuckets().size());
Assert.assertTrue(result.getBuckets().stream()
.allMatch(entry -> entry.getBucketName().startsWith("aBucket")));
// List a certain number of buckets.
listBucketArgs = new ListArgs(volArgs, null, 3, null);
result = storageHandler.listBuckets(listBucketArgs);
Assert.assertEquals(3, result.getBuckets().size());
Assert.assertEquals("aBucket_0",
result.getBuckets().get(0).getBucketName());
Assert.assertEquals("aBucket_1",
result.getBuckets().get(1).getBucketName());
Assert.assertEquals("aBucket_2",
result.getBuckets().get(2).getBucketName());
// List a certain number of buckets from the startKey.
listBucketArgs = new ListArgs(volArgs, null, 2, "bBucket_3");
result = storageHandler.listBuckets(listBucketArgs);
Assert.assertEquals(2, result.getBuckets().size());
Assert.assertEquals("bBucket_3",
result.getBuckets().get(0).getBucketName());
Assert.assertEquals("bBucket_4",
result.getBuckets().get(1).getBucketName());
// Provide an invalid bucket name as start key.
listBucketArgs = new ListArgs(volArgs, null, 100, "unknown_bucket_name");
try {
storageHandler.listBuckets(listBucketArgs);
Assert.fail("Expecting an error when the given bucket name is invalid.");
} catch (Exception e) {
Assert.assertTrue(e instanceof IOException);
Assert.assertTrue(e.getMessage().contains(Status.INTERNAL_ERROR.name()));
}
// Use all arguments.
listBucketArgs = new ListArgs(volArgs, "b", 5, "bBucket_8");
result = storageHandler.listBuckets(listBucketArgs);
Assert.assertEquals(2, result.getBuckets().size());
Assert.assertEquals("bBucket_8",
result.getBuckets().get(0).getBucketName());
Assert.assertEquals("bBucket_9",
result.getBuckets().get(1).getBucketName());
// Provide an invalid maxKeys argument.
try {
listBucketArgs = new ListArgs(volArgs, null, -1, null);
storageHandler.listBuckets(listBucketArgs);
Assert.fail("Expecting an error when the given"
+ " maxKeys argument is invalid.");
} catch (Exception e) {
Assert.assertTrue(e.getMessage()
.contains(String.format("the value must be in range (0, %d]",
OzoneConsts.MAX_LISTBUCKETS_SIZE)));
}
// Provide an invalid volume name.
VolumeArgs invalidVolArgs = new VolumeArgs("invalid_name", userArgs);
try {
listBucketArgs = new ListArgs(invalidVolArgs, null, 100, null);
storageHandler.listBuckets(listBucketArgs);
Assert.fail("Expecting an error when the given volume name is invalid.");
} catch (Exception e) {
Assert.assertTrue(e instanceof IOException);
Assert.assertTrue(e.getMessage()
.contains(Status.VOLUME_NOT_FOUND.name()));
}
}
}