HDFS-11779. Ozone: KSM: add listBuckets. Contributed by Weiwei Yang.
This commit is contained in:
parent
bacd1188f1
commit
0c37e05b10
|
@ -161,4 +161,27 @@ public interface KeySpaceManagerProtocol {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
void deleteBucket(String volume, String bucket) throws IOException;
|
void deleteBucket(String volume, String bucket) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a list of buckets represented by {@link KsmBucketInfo}
|
||||||
|
* in the given volume. Argument volumeName is required, others
|
||||||
|
* are optional.
|
||||||
|
*
|
||||||
|
* @param volumeName
|
||||||
|
* the name of the volume.
|
||||||
|
* @param startBucketName
|
||||||
|
* the start bucket name, only the buckets whose name is
|
||||||
|
* after this value will be included in the result.
|
||||||
|
* @param bucketPrefix
|
||||||
|
* bucket name prefix, only the buckets whose name has
|
||||||
|
* this prefix will be included in the result.
|
||||||
|
* @param maxNumOfBuckets
|
||||||
|
* the maximum number of buckets to return. It ensures
|
||||||
|
* the size of the result will not exceed this limit.
|
||||||
|
* @return a list of buckets.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
List<KsmBucketInfo> listBuckets(String volumeName,
|
||||||
|
String startBucketName, String bucketPrefix, int maxNumOfBuckets)
|
||||||
|
throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,6 +74,10 @@ import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.CheckVolumeAccessRequest;
|
.KeySpaceManagerProtocolProtos.CheckVolumeAccessRequest;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.CheckVolumeAccessResponse;
|
.KeySpaceManagerProtocolProtos.CheckVolumeAccessResponse;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.ListBucketsRequest;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.ListBucketsResponse;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.VolumeInfo;
|
.KeySpaceManagerProtocolProtos.VolumeInfo;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
@ -84,6 +88,8 @@ import org.apache.hadoop.ozone.protocol.proto
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The client side implementation of KeySpaceManagerProtocol.
|
* The client side implementation of KeySpaceManagerProtocol.
|
||||||
|
@ -396,6 +402,49 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List buckets in a volume.
|
||||||
|
*
|
||||||
|
* @param volumeName
|
||||||
|
* @param startKey
|
||||||
|
* @param prefix
|
||||||
|
* @param count
|
||||||
|
* @return
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public List<KsmBucketInfo> listBuckets(String volumeName,
|
||||||
|
String startKey, String prefix, int count) throws IOException {
|
||||||
|
List<KsmBucketInfo> buckets = new ArrayList<>();
|
||||||
|
ListBucketsRequest.Builder reqBuilder = ListBucketsRequest.newBuilder();
|
||||||
|
reqBuilder.setVolumeName(volumeName);
|
||||||
|
reqBuilder.setCount(count);
|
||||||
|
if (startKey != null) {
|
||||||
|
reqBuilder.setStartKey(startKey);
|
||||||
|
}
|
||||||
|
if (prefix != null) {
|
||||||
|
reqBuilder.setPrefix(prefix);
|
||||||
|
}
|
||||||
|
ListBucketsRequest request = reqBuilder.build();
|
||||||
|
final ListBucketsResponse resp;
|
||||||
|
try {
|
||||||
|
resp = rpcProxy.listBuckets(NULL_RPC_CONTROLLER, request);
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (resp.getStatus() == Status.OK) {
|
||||||
|
buckets.addAll(
|
||||||
|
resp.getBucketInfoList().stream()
|
||||||
|
.map(KsmBucketInfo::getFromProtobuf)
|
||||||
|
.collect(Collectors.toList()));
|
||||||
|
return buckets;
|
||||||
|
} else {
|
||||||
|
throw new IOException("List Buckets failed, error: "
|
||||||
|
+ resp.getStatus());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allocate a block for a key, then use the returned meta info to talk to data
|
* Allocate a block for a key, then use the returned meta info to talk to data
|
||||||
* node to actually write the key.
|
* node to actually write the key.
|
||||||
|
|
|
@ -106,6 +106,11 @@ public final class OzoneConsts {
|
||||||
*/
|
*/
|
||||||
public static final long MAX_QUOTA_IN_BYTES = 1024L * 1024 * TB;
|
public static final long MAX_QUOTA_IN_BYTES = 1024L * 1024 * TB;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Max number of keys returned per list buckets operation.
|
||||||
|
*/
|
||||||
|
public static final int MAX_LISTBUCKETS_SIZE = 1024;
|
||||||
|
|
||||||
private OzoneConsts() {
|
private OzoneConsts() {
|
||||||
// Never Constructed
|
// Never Constructed
|
||||||
}
|
}
|
||||||
|
|
|
@ -210,9 +210,19 @@ message InfoBucketRequest {
|
||||||
message InfoBucketResponse {
|
message InfoBucketResponse {
|
||||||
required Status status = 1;
|
required Status status = 1;
|
||||||
optional BucketInfo bucketInfo = 2;
|
optional BucketInfo bucketInfo = 2;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message ListBucketsRequest {
|
||||||
|
required string volumeName = 1;
|
||||||
|
optional string startKey = 2;
|
||||||
|
optional string prefix = 3;
|
||||||
|
optional int32 count = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ListBucketsResponse {
|
||||||
|
required Status status = 1;
|
||||||
|
repeated BucketInfo bucketInfo = 2;
|
||||||
|
}
|
||||||
|
|
||||||
message KeyArgs {
|
message KeyArgs {
|
||||||
required string volumeName = 1;
|
required string volumeName = 1;
|
||||||
|
@ -339,4 +349,10 @@ service KeySpaceManagerService {
|
||||||
*/
|
*/
|
||||||
rpc deleteBucket(DeleteBucketRequest)
|
rpc deleteBucket(DeleteBucketRequest)
|
||||||
returns (DeleteBucketResponse);
|
returns (DeleteBucketResponse);
|
||||||
|
|
||||||
|
/**
|
||||||
|
List Buckets.
|
||||||
|
*/
|
||||||
|
rpc listBuckets(ListBucketsRequest)
|
||||||
|
returns(ListBucketsResponse);
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* BucketManager handles all the bucket level operations.
|
* BucketManager handles all the bucket level operations.
|
||||||
|
@ -52,4 +53,27 @@ public interface BucketManager {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
void deleteBucket(String volumeName, String bucketName) throws IOException;
|
void deleteBucket(String volumeName, String bucketName) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a list of buckets represented by {@link KsmBucketInfo}
|
||||||
|
* in the given volume.
|
||||||
|
*
|
||||||
|
* @param volumeName
|
||||||
|
* Required parameter volume name determines buckets in which volume
|
||||||
|
* to return.
|
||||||
|
* @param startBucket
|
||||||
|
* Optional start bucket name parameter indicating where to start
|
||||||
|
* the bucket listing from.
|
||||||
|
* @param bucketPrefix
|
||||||
|
* Optional start key parameter, restricting the response to buckets
|
||||||
|
* that begin with the specified name.
|
||||||
|
* @param maxNumOfBuckets
|
||||||
|
* The maximum number of buckets to return. It ensures
|
||||||
|
* the size of the result will not exceed this limit.
|
||||||
|
* @return a list of buckets.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
List<KsmBucketInfo> listBuckets(String volumeName,
|
||||||
|
String startBucket, String bucketPrefix, int maxNumOfBuckets)
|
||||||
|
throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -277,4 +277,20 @@ public class BucketManagerImpl implements BucketManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public List<KsmBucketInfo> listBuckets(String volumeName,
|
||||||
|
String startBucket, String bucketPrefix, int maxNumOfBuckets)
|
||||||
|
throws IOException {
|
||||||
|
Preconditions.checkNotNull(volumeName);
|
||||||
|
metadataManager.readLock().lock();
|
||||||
|
try {
|
||||||
|
return metadataManager.listBuckets(
|
||||||
|
volumeName, startBucket, bucketPrefix, maxNumOfBuckets);
|
||||||
|
} finally {
|
||||||
|
metadataManager.readLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,7 @@ public class KSMMetrics {
|
||||||
private @Metric MutableCounterLong numKeyAllocate;
|
private @Metric MutableCounterLong numKeyAllocate;
|
||||||
private @Metric MutableCounterLong numKeyLookup;
|
private @Metric MutableCounterLong numKeyLookup;
|
||||||
private @Metric MutableCounterLong numKeyDeletes;
|
private @Metric MutableCounterLong numKeyDeletes;
|
||||||
|
private @Metric MutableCounterLong numBucketLists;
|
||||||
|
|
||||||
// Failure Metrics
|
// Failure Metrics
|
||||||
private @Metric MutableCounterLong numVolumeCreateFails;
|
private @Metric MutableCounterLong numVolumeCreateFails;
|
||||||
|
@ -54,6 +55,7 @@ public class KSMMetrics {
|
||||||
private @Metric MutableCounterLong numKeyAllocateFails;
|
private @Metric MutableCounterLong numKeyAllocateFails;
|
||||||
private @Metric MutableCounterLong numKeyLookupFails;
|
private @Metric MutableCounterLong numKeyLookupFails;
|
||||||
private @Metric MutableCounterLong numKeyDeleteFails;
|
private @Metric MutableCounterLong numKeyDeleteFails;
|
||||||
|
private @Metric MutableCounterLong numBucketListFails;
|
||||||
|
|
||||||
public KSMMetrics() {
|
public KSMMetrics() {
|
||||||
}
|
}
|
||||||
|
@ -101,6 +103,10 @@ public class KSMMetrics {
|
||||||
numBucketDeletes.incr();
|
numBucketDeletes.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incNumBucketLists() {
|
||||||
|
numBucketLists.incr();
|
||||||
|
}
|
||||||
|
|
||||||
public void incNumVolumeCreateFails() {
|
public void incNumVolumeCreateFails() {
|
||||||
numVolumeCreateFails.incr();
|
numVolumeCreateFails.incr();
|
||||||
}
|
}
|
||||||
|
@ -161,6 +167,10 @@ public class KSMMetrics {
|
||||||
numKeyDeletes.incr();
|
numKeyDeletes.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incNumBucketListFails() {
|
||||||
|
numBucketListFails.incr();
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public long getNumVolumeCreates() {
|
public long getNumVolumeCreates() {
|
||||||
return numVolumeCreates.value();
|
return numVolumeCreates.value();
|
||||||
|
@ -206,6 +216,11 @@ public class KSMMetrics {
|
||||||
return numBucketDeletes.value();
|
return numBucketDeletes.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumBucketLists() {
|
||||||
|
return numBucketLists.value();
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public long getNumVolumeCreateFails() {
|
public long getNumVolumeCreateFails() {
|
||||||
return numVolumeCreateFails.value();
|
return numVolumeCreateFails.value();
|
||||||
|
@ -280,4 +295,9 @@ public class KSMMetrics {
|
||||||
public long getNumKeyDeletesFails() {
|
public long getNumKeyDeletesFails() {
|
||||||
return numKeyDeleteFails.value();
|
return numKeyDeleteFails.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumBucketListFails() {
|
||||||
|
return numBucketListFails.value();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -383,6 +383,23 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public List<KsmBucketInfo> listBuckets(String volumeName,
|
||||||
|
String startKey, String prefix, int maxNumOfBuckets)
|
||||||
|
throws IOException {
|
||||||
|
try {
|
||||||
|
metrics.incNumBucketLists();
|
||||||
|
return bucketManager.listBuckets(volumeName,
|
||||||
|
startKey, prefix, maxNumOfBuckets);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
metrics.incNumBucketListFails();
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the bucket information.
|
* Gets the bucket information.
|
||||||
*
|
*
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.ksm;
|
package org.apache.hadoop.ozone.ksm;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -133,4 +135,26 @@ public interface MetadataManager {
|
||||||
* @return true if the bucket is empty
|
* @return true if the bucket is empty
|
||||||
*/
|
*/
|
||||||
boolean isBucketEmpty(String volume, String bucket) throws IOException;
|
boolean isBucketEmpty(String volume, String bucket) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a list of buckets represented by {@link KsmBucketInfo}
|
||||||
|
* in the given volume.
|
||||||
|
*
|
||||||
|
* @param volumeName
|
||||||
|
* the name of the volume. This argument is required,
|
||||||
|
* this method returns buckets in this given volume.
|
||||||
|
* @param startBucket
|
||||||
|
* the start bucket name. Only the buckets whose name is
|
||||||
|
* after this value will be included in the result.
|
||||||
|
* @param bucketPrefix
|
||||||
|
* bucket name prefix. Only the buckets whose name has
|
||||||
|
* this prefix will be included in the result.
|
||||||
|
* @param maxNumOfBuckets
|
||||||
|
* the maximum number of buckets to return. It ensures
|
||||||
|
* the size of the result will not exceed this limit.
|
||||||
|
* @return a list of buckets.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
List<KsmBucketInfo> listBuckets(String volumeName, String startBucket,
|
||||||
|
String bucketPrefix, int maxNumOfBuckets) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,10 +16,17 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.ksm;
|
package org.apache.hadoop.ozone.ksm;
|
||||||
|
|
||||||
|
import com.google.common.base.Strings;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
|
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
||||||
|
import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.BucketInfo;
|
||||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||||
|
import org.apache.hadoop.utils.LevelDBKeyFilters.KeyPrefixFilter;
|
||||||
|
import org.apache.hadoop.utils.LevelDBKeyFilters.LevelDBKeyFilter;
|
||||||
import org.apache.hadoop.utils.LevelDBStore;
|
import org.apache.hadoop.utils.LevelDBStore;
|
||||||
import org.iq80.leveldb.DBIterator;
|
import org.iq80.leveldb.DBIterator;
|
||||||
import org.iq80.leveldb.Options;
|
import org.iq80.leveldb.Options;
|
||||||
|
@ -28,6 +35,7 @@ import org.iq80.leveldb.WriteBatch;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
@ -106,6 +114,17 @@ public class MetadataManagerImpl implements MetadataManager {
|
||||||
return DFSUtil.string2Bytes(bucketKeyString);
|
return DFSUtil.string2Bytes(bucketKeyString);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getBucketKeyPrefix(String volume, String bucket) {
|
||||||
|
StringBuffer sb = new StringBuffer();
|
||||||
|
sb.append(OzoneConsts.KSM_VOLUME_PREFIX)
|
||||||
|
.append(volume)
|
||||||
|
.append(OzoneConsts.KSM_BUCKET_PREFIX);
|
||||||
|
if (!Strings.isNullOrEmpty(bucket)) {
|
||||||
|
sb.append(bucket);
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] getDBKeyForKey(String volume, String bucket, String key) {
|
public byte[] getDBKeyForKey(String volume, String bucket, String key) {
|
||||||
String keyKeyString = OzoneConsts.KSM_VOLUME_PREFIX + volume
|
String keyKeyString = OzoneConsts.KSM_VOLUME_PREFIX + volume
|
||||||
|
@ -252,4 +271,39 @@ public class MetadataManagerImpl implements MetadataManager {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public List<KsmBucketInfo> listBuckets(final String volumeName,
|
||||||
|
final String startBucket, final String bucketPrefix,
|
||||||
|
final int maxNumOfBuckets) throws IOException {
|
||||||
|
List<KsmBucketInfo> result = new ArrayList<>();
|
||||||
|
if (Strings.isNullOrEmpty(volumeName)) {
|
||||||
|
throw new KSMException("Volume name is required.",
|
||||||
|
ResultCodes.FAILED_VOLUME_NOT_FOUND);
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] volumeNameBytes = getVolumeKey(volumeName);
|
||||||
|
if (store.get(volumeNameBytes) == null) {
|
||||||
|
throw new KSMException("Volume " + volumeName + " not found.",
|
||||||
|
ResultCodes.FAILED_VOLUME_NOT_FOUND);
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] startKeyBytes = null;
|
||||||
|
if (!Strings.isNullOrEmpty(startBucket)) {
|
||||||
|
startKeyBytes = getBucketKey(volumeName, startBucket);
|
||||||
|
}
|
||||||
|
LevelDBKeyFilter filter =
|
||||||
|
new KeyPrefixFilter(getBucketKeyPrefix(volumeName, bucketPrefix));
|
||||||
|
List<Map.Entry<byte[], byte[]>> rangeResult =
|
||||||
|
store.getRangeKVs(startKeyBytes, maxNumOfBuckets, filter);
|
||||||
|
for (Map.Entry<byte[], byte[]> entry : rangeResult) {
|
||||||
|
KsmBucketInfo info = KsmBucketInfo.getFromProtobuf(
|
||||||
|
BucketInfo.parseFrom(entry.getValue()));
|
||||||
|
result.add(info);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,11 +72,17 @@ import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.ListVolumeRequest;
|
.KeySpaceManagerProtocolProtos.ListVolumeRequest;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.ListVolumeResponse;
|
.KeySpaceManagerProtocolProtos.ListVolumeResponse;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.ListBucketsRequest;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.ListBucketsResponse;
|
||||||
|
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.Status;
|
.KeySpaceManagerProtocolProtos.Status;
|
||||||
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is the server-side translator that forwards requests received on
|
* This class is the server-side translator that forwards requests received on
|
||||||
|
@ -353,4 +359,26 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
|
||||||
}
|
}
|
||||||
return resp.build();
|
return resp.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ListBucketsResponse listBuckets(
|
||||||
|
RpcController controller, ListBucketsRequest request)
|
||||||
|
throws ServiceException {
|
||||||
|
ListBucketsResponse.Builder resp =
|
||||||
|
ListBucketsResponse.newBuilder();
|
||||||
|
try {
|
||||||
|
List<KsmBucketInfo> buckets = impl.listBuckets(
|
||||||
|
request.getVolumeName(),
|
||||||
|
request.getStartKey(),
|
||||||
|
request.getPrefix(),
|
||||||
|
request.getCount());
|
||||||
|
for(KsmBucketInfo bucket : buckets) {
|
||||||
|
resp.addBucketInfo(bucket.getProtobuf());
|
||||||
|
}
|
||||||
|
resp.setStatus(Status.OK);
|
||||||
|
} catch (IOException e) {
|
||||||
|
resp.setStatus(exceptionToResponseStatus(e));
|
||||||
|
}
|
||||||
|
return resp.build();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.ozone.web.storage;
|
package org.apache.hadoop.ozone.web.storage;
|
||||||
|
|
||||||
|
import com.google.common.base.Strings;
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto
|
import org.apache.hadoop.hdfs.ozone.protocol.proto
|
||||||
.ContainerProtos.ChunkInfo;
|
.ContainerProtos.ChunkInfo;
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto
|
import org.apache.hadoop.hdfs.ozone.protocol.proto
|
||||||
|
@ -52,6 +53,7 @@ import org.apache.hadoop.ozone.web.handlers.BucketArgs;
|
||||||
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
|
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
|
||||||
import org.apache.hadoop.ozone.web.handlers.ListArgs;
|
import org.apache.hadoop.ozone.web.handlers.ListArgs;
|
||||||
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
|
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
|
||||||
|
import org.apache.hadoop.ozone.web.handlers.UserArgs;
|
||||||
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
|
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
|
||||||
import org.apache.hadoop.ozone.web.response.ListVolumes;
|
import org.apache.hadoop.ozone.web.response.ListVolumes;
|
||||||
import org.apache.hadoop.ozone.web.response.VolumeInfo;
|
import org.apache.hadoop.ozone.web.response.VolumeInfo;
|
||||||
|
@ -293,7 +295,43 @@ public final class DistributedStorageHandler implements StorageHandler {
|
||||||
@Override
|
@Override
|
||||||
public ListBuckets listBuckets(ListArgs args)
|
public ListBuckets listBuckets(ListArgs args)
|
||||||
throws IOException, OzoneException {
|
throws IOException, OzoneException {
|
||||||
throw new UnsupportedOperationException("listBuckets not implemented");
|
ListBuckets result = new ListBuckets();
|
||||||
|
UserArgs userArgs = args.getArgs();
|
||||||
|
if (userArgs instanceof VolumeArgs) {
|
||||||
|
VolumeArgs va = (VolumeArgs) userArgs;
|
||||||
|
if (Strings.isNullOrEmpty(va.getVolumeName())) {
|
||||||
|
throw new IllegalArgumentException("Illegal argument,"
|
||||||
|
+ " volume name cannot be null or empty.");
|
||||||
|
}
|
||||||
|
|
||||||
|
int maxNumOfKeys = args.getMaxKeys();
|
||||||
|
if (maxNumOfKeys <= 0 ||
|
||||||
|
maxNumOfKeys > OzoneConsts.MAX_LISTBUCKETS_SIZE) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
String.format("Illegal max number of keys specified,"
|
||||||
|
+ " the value must be in range (0, %d], actual : %d.",
|
||||||
|
OzoneConsts.MAX_LISTBUCKETS_SIZE, maxNumOfKeys));
|
||||||
|
}
|
||||||
|
|
||||||
|
List<KsmBucketInfo> buckets =
|
||||||
|
keySpaceManagerClient.listBuckets(va.getVolumeName(),
|
||||||
|
args.getPrevKey(), args.getPrefix(), args.getMaxKeys());
|
||||||
|
|
||||||
|
// Convert the result for the web layer.
|
||||||
|
for (KsmBucketInfo bucketInfo : buckets) {
|
||||||
|
BucketInfo bk = new BucketInfo();
|
||||||
|
bk.setVolumeName(bucketInfo.getVolumeName());
|
||||||
|
bk.setBucketName(bucketInfo.getBucketName());
|
||||||
|
bk.setStorageType(bucketInfo.getStorageType());
|
||||||
|
bk.setAcls(bucketInfo.getAcls());
|
||||||
|
result.addBucket(bk);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
} else {
|
||||||
|
throw new IllegalArgumentException("Illegal argument provided,"
|
||||||
|
+ " expecting VolumeArgs type but met "
|
||||||
|
+ userArgs.getClass().getSimpleName());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -266,7 +266,6 @@ public class LevelDBStore implements Closeable {
|
||||||
snapShot = db.getSnapshot();
|
snapShot = db.getSnapshot();
|
||||||
ReadOptions readOptions = new ReadOptions().snapshot(snapShot);
|
ReadOptions readOptions = new ReadOptions().snapshot(snapShot);
|
||||||
dbIter = db.iterator(readOptions);
|
dbIter = db.iterator(readOptions);
|
||||||
dbIter.seekToFirst();
|
|
||||||
if (startKey == null) {
|
if (startKey == null) {
|
||||||
dbIter.seekToFirst();
|
dbIter.seekToFirst();
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -36,6 +36,10 @@ import org.apache.hadoop.ozone.web.request.OzoneQuota;
|
||||||
import org.apache.hadoop.ozone.web.response.BucketInfo;
|
import org.apache.hadoop.ozone.web.response.BucketInfo;
|
||||||
import org.apache.hadoop.ozone.web.response.VolumeInfo;
|
import org.apache.hadoop.ozone.web.response.VolumeInfo;
|
||||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.Status;
|
||||||
|
import org.apache.hadoop.ozone.web.handlers.ListArgs;
|
||||||
|
import org.apache.hadoop.ozone.web.response.ListBuckets;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
@ -51,6 +55,8 @@ import java.util.HashSet;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test Key Space Manager operation in distributed handler scenario.
|
* Test Key Space Manager operation in distributed handler scenario.
|
||||||
|
@ -598,4 +604,120 @@ public class TestKeySpaceManager {
|
||||||
ksmMetrics.getNumKeyDeletesFails());
|
ksmMetrics.getNumKeyDeletesFails());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testListBuckets() throws IOException, OzoneException {
|
||||||
|
ListBuckets result = null;
|
||||||
|
ListArgs listBucketArgs = null;
|
||||||
|
|
||||||
|
// Create volume - volA.
|
||||||
|
final String volAname = "volA";
|
||||||
|
VolumeArgs volAArgs = new VolumeArgs(volAname, userArgs);
|
||||||
|
volAArgs.setUserName("userA");
|
||||||
|
volAArgs.setAdminName("adminA");
|
||||||
|
storageHandler.createVolume(volAArgs);
|
||||||
|
|
||||||
|
// Create 20 buckets in volA for tests.
|
||||||
|
for (int i=0; i<10; i++) {
|
||||||
|
// Create "/volA/aBucket_0" to "/volA/aBucket_9" buckets in volA volume.
|
||||||
|
BucketArgs aBuckets = new BucketArgs(volAname,
|
||||||
|
"aBucket_" + i, userArgs);
|
||||||
|
if(i % 3 == 0) {
|
||||||
|
aBuckets.setStorageType(StorageType.ARCHIVE);
|
||||||
|
} else {
|
||||||
|
aBuckets.setStorageType(StorageType.DISK);
|
||||||
|
}
|
||||||
|
storageHandler.createBucket(aBuckets);
|
||||||
|
|
||||||
|
// Create "/volA/bBucket_0" to "/volA/bBucket_9" buckets in volA volume.
|
||||||
|
BucketArgs bBuckets = new BucketArgs(volAname,
|
||||||
|
"bBucket_" + i, userArgs);
|
||||||
|
if(i % 3 == 0) {
|
||||||
|
bBuckets.setStorageType(StorageType.RAM_DISK);
|
||||||
|
} else {
|
||||||
|
bBuckets.setStorageType(StorageType.SSD);
|
||||||
|
}
|
||||||
|
storageHandler.createBucket(bBuckets);
|
||||||
|
}
|
||||||
|
|
||||||
|
VolumeArgs volArgs = new VolumeArgs(volAname, userArgs);
|
||||||
|
|
||||||
|
// List all buckets in volA.
|
||||||
|
listBucketArgs = new ListArgs(volArgs, null, 100, null);
|
||||||
|
result = storageHandler.listBuckets(listBucketArgs);
|
||||||
|
Assert.assertEquals(20, result.getBuckets().size());
|
||||||
|
List<BucketInfo> archiveBuckets = result.getBuckets().stream()
|
||||||
|
.filter(item -> item.getStorageType() == StorageType.ARCHIVE)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
Assert.assertEquals(4, archiveBuckets.size());
|
||||||
|
|
||||||
|
// List buckets with prefix "aBucket".
|
||||||
|
listBucketArgs = new ListArgs(volArgs, "aBucket", 100, null);
|
||||||
|
result = storageHandler.listBuckets(listBucketArgs);
|
||||||
|
Assert.assertEquals(10, result.getBuckets().size());
|
||||||
|
Assert.assertTrue(result.getBuckets().stream()
|
||||||
|
.allMatch(entry -> entry.getBucketName().startsWith("aBucket")));
|
||||||
|
|
||||||
|
// List a certain number of buckets.
|
||||||
|
listBucketArgs = new ListArgs(volArgs, null, 3, null);
|
||||||
|
result = storageHandler.listBuckets(listBucketArgs);
|
||||||
|
Assert.assertEquals(3, result.getBuckets().size());
|
||||||
|
Assert.assertEquals("aBucket_0",
|
||||||
|
result.getBuckets().get(0).getBucketName());
|
||||||
|
Assert.assertEquals("aBucket_1",
|
||||||
|
result.getBuckets().get(1).getBucketName());
|
||||||
|
Assert.assertEquals("aBucket_2",
|
||||||
|
result.getBuckets().get(2).getBucketName());
|
||||||
|
|
||||||
|
// List a certain number of buckets from the startKey.
|
||||||
|
listBucketArgs = new ListArgs(volArgs, null, 2, "bBucket_3");
|
||||||
|
result = storageHandler.listBuckets(listBucketArgs);
|
||||||
|
Assert.assertEquals(2, result.getBuckets().size());
|
||||||
|
Assert.assertEquals("bBucket_3",
|
||||||
|
result.getBuckets().get(0).getBucketName());
|
||||||
|
Assert.assertEquals("bBucket_4",
|
||||||
|
result.getBuckets().get(1).getBucketName());
|
||||||
|
|
||||||
|
// Provide an invalid bucket name as start key.
|
||||||
|
listBucketArgs = new ListArgs(volArgs, null, 100, "unknown_bucket_name");
|
||||||
|
try {
|
||||||
|
storageHandler.listBuckets(listBucketArgs);
|
||||||
|
Assert.fail("Expecting an error when the given bucket name is invalid.");
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.assertTrue(e instanceof IOException);
|
||||||
|
Assert.assertTrue(e.getMessage().contains(Status.INTERNAL_ERROR.name()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use all arguments.
|
||||||
|
listBucketArgs = new ListArgs(volArgs, "b", 5, "bBucket_8");
|
||||||
|
result = storageHandler.listBuckets(listBucketArgs);
|
||||||
|
Assert.assertEquals(2, result.getBuckets().size());
|
||||||
|
Assert.assertEquals("bBucket_8",
|
||||||
|
result.getBuckets().get(0).getBucketName());
|
||||||
|
Assert.assertEquals("bBucket_9",
|
||||||
|
result.getBuckets().get(1).getBucketName());
|
||||||
|
|
||||||
|
// Provide an invalid maxKeys argument.
|
||||||
|
try {
|
||||||
|
listBucketArgs = new ListArgs(volArgs, null, -1, null);
|
||||||
|
storageHandler.listBuckets(listBucketArgs);
|
||||||
|
Assert.fail("Expecting an error when the given"
|
||||||
|
+ " maxKeys argument is invalid.");
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.assertTrue(e.getMessage()
|
||||||
|
.contains(String.format("the value must be in range (0, %d]",
|
||||||
|
OzoneConsts.MAX_LISTBUCKETS_SIZE)));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Provide an invalid volume name.
|
||||||
|
VolumeArgs invalidVolArgs = new VolumeArgs("invalid_name", userArgs);
|
||||||
|
try {
|
||||||
|
listBucketArgs = new ListArgs(invalidVolArgs, null, 100, null);
|
||||||
|
storageHandler.listBuckets(listBucketArgs);
|
||||||
|
Assert.fail("Expecting an error when the given volume name is invalid.");
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.assertTrue(e instanceof IOException);
|
||||||
|
Assert.assertTrue(e.getMessage()
|
||||||
|
.contains(Status.VOLUME_NOT_FOUND.name()));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue