HDFS-10278. Ozone: Add paging support to list Volumes. Contributed by Anu Engineer.

This commit is contained in:
Anu Engineer 2016-04-12 18:08:48 -07:00
parent 33b51da8c5
commit eee7269748
19 changed files with 525 additions and 164 deletions

View File

@ -331,19 +331,21 @@ public class ContainerManagerImpl implements ContainerManager {
* time. It is possible that using this iteration you can miss certain
* container from the listing.
*
* @param prevKey - Previous Key Value or empty String.
* @param prefix - Return keys that match this prefix.
* @param count - how many to return
* @param prevKey - Previous Key Value or empty String.
* @param data - Actual containerData
* @throws IOException
*/
@Override
public void listContainer(String prevKey, long count,
public void listContainer(String prefix, long count, String prevKey,
List<ContainerData> data) throws IOException {
// TODO : Support list with Prefix and PrevKey
Preconditions.checkNotNull(data);
readLock();
try {
ConcurrentNavigableMap<String, ContainerStatus> map = null;
if (prevKey.length() == 0) {
if (prevKey == null || prevKey.isEmpty()) {
map = containerMap.tailMap(containerMap.firstKey(), true);
} else {
map = containerMap.tailMap(prevKey, false);

View File

@ -72,12 +72,14 @@ public interface ContainerManager extends RwLock {
/**
* As simple interface for container Iterations.
*
* @param prevKey - Starting KeyValue
* @param prefix - Return only values matching this prefix
* @param count - how many to return
* @param prevKey - Previous key - Server returns results from this point.
* @param data - Actual containerData
* @throws IOException
*/
void listContainer(String prevKey, long count, List<ContainerData> data)
void listContainer(String prefix, long count, String prevKey,
List<ContainerData> data)
throws IOException;
/**

View File

@ -20,16 +20,14 @@ package org.apache.hadoop.ozone.web.client;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.headers.Header;
import org.apache.hadoop.ozone.web.request.OzoneAcl;
import org.apache.hadoop.ozone.web.response.BucketInfo;
import org.apache.hadoop.ozone.web.response.KeyInfo;
import org.apache.hadoop.ozone.web.response.ListKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.http.HttpEntity;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
@ -39,8 +37,6 @@ import org.apache.http.entity.ContentType;
import org.apache.http.entity.FileEntity;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import javax.ws.rs.core.HttpHeaders;
@ -514,16 +510,4 @@ public class OzoneBucket {
}
}
}
/**
* used to fix the content-length issue with protocol class.
*/
private static class ContentLengthHeaderRemover implements
HttpRequestInterceptor {
@Override
public void process(HttpRequest request, HttpContext context)
throws IOException {
request.removeHeaders(HTTP.CONTENT_LEN);
}
}
}

View File

@ -193,15 +193,35 @@ public class OzoneClient implements Closeable {
* @param onBehalfOf - User Name of the user if it is not the caller. for
* example, an admin wants to list some other users
* volumes.
* @param prefix - Return only volumes that match this prefix.
* @param maxKeys - Maximum number of results to return, if the result set
* is smaller than requested size, it means that list is
* complete.
* @param prevKey - The last key that client got, server will continue
* returning results from that point.
* @return List of Volumes
* @throws OzoneException
*/
public List<OzoneVolume> listVolumes(String onBehalfOf)
throws OzoneException {
public List<OzoneVolume> listVolumes(String onBehalfOf, String prefix, int
maxKeys, OzoneVolume prevKey) throws OzoneException {
try {
DefaultHttpClient httpClient = new DefaultHttpClient();
URIBuilder builder = new URIBuilder(endPointURI);
if (prefix != null) {
builder.addParameter(Header.OZONE_LIST_QUERY_PREFIX, prefix);
}
if (maxKeys > 0) {
builder.addParameter(Header.OZONE_LIST_QUERY_MAXKEYS, Integer
.toString(maxKeys));
}
if (prevKey != null) {
builder.addParameter(Header.OZONE_LIST_QUERY_PREVKEY,
prevKey.getOwnerName() + "/" + prevKey.getVolumeName());
}
builder.setPath("/").build();
HttpGet httpget = getHttpGet(builder.toString());
@ -216,11 +236,64 @@ public class OzoneClient implements Closeable {
}
/**
* delete a given volume.
* List volumes of the current user or if onBehalfof is not null lists volume
* owned by that user. You need admin privilege to read other users volume
* lists.
*
* @param volumeName - volume to be deleted.
* @throws OzoneException - Ozone Exception
* @param onBehalfOf - Name of the user you want to get volume list
* @return - Volume list.
* @throws OzoneException
*/
public List<OzoneVolume> listVolumes(String onBehalfOf)
throws OzoneException {
return listVolumes(onBehalfOf, null, 1000, null);
}
/**
* List all volumes in a cluster. This can be invoked only by an Admin.
*
* @param prefix - Returns only volumes that match this prefix.
* @param maxKeys - Maximum niumber of keys to return
* @param prevKey - Last Ozone Volume from the last Iteration.
* @return List of Volumes
* @throws OzoneException
*/
public List<OzoneVolume> listAllVolumes(String prefix, int maxKeys,
OzoneVolume prevKey) throws OzoneException {
try {
DefaultHttpClient httpClient = new DefaultHttpClient();
URIBuilder builder = new URIBuilder(endPointURI);
if (prefix != null) {
builder.addParameter(Header.OZONE_LIST_QUERY_PREFIX, prefix);
}
if (maxKeys > 0) {
builder.addParameter(Header.OZONE_LIST_QUERY_MAXKEYS, Integer
.toString(maxKeys));
}
if (prevKey != null) {
builder.addParameter(Header.OZONE_LIST_QUERY_PREVKEY,
prevKey.getOwnerName()+ "/" + prevKey.getVolumeName());
}
builder.addParameter(Header.OZONE_LIST_QUERY_ROOTSCAN, "true");
builder.setPath("/").build();
HttpGet httpget = getHttpGet(builder.toString());
return executeListVolume(httpget, httpClient);
} catch (IOException | URISyntaxException ex) {
throw new OzoneClientException(ex.getMessage());
}
}
/**
* delete a given volume.
*
* @param volumeName - volume to be deleted.
* @throws OzoneException - Ozone Exception
*/
public void deleteVolume(String volumeName) throws OzoneException {
try {
DefaultHttpClient httpClient = new DefaultHttpClient();

View File

@ -313,7 +313,8 @@ public abstract class BucketProcessTemplate {
Response getBucketKeysList(ListArgs args) throws IOException, OzoneException {
StorageHandler fs = StorageHandlerBuilder.getStorageHandler();
ListKeys objects = fs.listKeys(args);
return OzoneUtils.getResponse(args, HTTP_OK, objects.toJsonString());
return OzoneUtils.getResponse(args.getArgs(), HTTP_OK,
objects.toJsonString());
}
}

View File

@ -20,10 +20,12 @@ package org.apache.hadoop.ozone.web.handlers;
/**
* Supports listing keys with pagination.
*/
public class ListArgs extends BucketArgs {
private String startPage;
public class ListArgs<T extends UserArgs> {
private String prevKey;
private String prefix;
private int maxKeys;
private boolean rootScan;
private T args;
/**
* Constructor for ListArgs.
@ -31,14 +33,14 @@ public class ListArgs extends BucketArgs {
* @param args - BucketArgs
* @param prefix Prefix to start Query from
* @param maxKeys Max result set
* @param startPage - Page token
* @param prevKey - Page token
*/
public ListArgs(BucketArgs args, String prefix, int maxKeys,
String startPage) {
super(args);
public ListArgs(T args, String prefix, int maxKeys,
String prevKey) {
setArgs(args);
setPrefix(prefix);
setMaxKeys(maxKeys);
setStartPage(startPage);
setPrevKey(prevKey);
}
/**
@ -46,8 +48,9 @@ public class ListArgs extends BucketArgs {
*
* @param args - List Args
*/
public ListArgs(ListArgs args) {
this(args, args.getPrefix(), args.getMaxKeys(), args.getStartPage());
public ListArgs(T args, ListArgs listArgs) {
this(args, listArgs.getPrefix(), listArgs.getMaxKeys(),
listArgs.getPrevKey());
}
/**
@ -55,17 +58,17 @@ public class ListArgs extends BucketArgs {
*
* @return String
*/
public String getStartPage() {
return startPage;
public String getPrevKey() {
return prevKey;
}
/**
* Sets page token.
*
* @param startPage - Page token
* @param prevKey - Page token
*/
public void setStartPage(String startPage) {
this.startPage = startPage;
public void setPrevKey(String prevKey) {
this.prevKey = prevKey;
}
/**
@ -103,4 +106,37 @@ public class ListArgs extends BucketArgs {
public void setPrefix(String prefix) {
this.prefix = prefix;
}
/**
* Gets args.
* @return T
*/
public T getArgs() {
return args;
}
/**
* Sets args.
* @param args T
*/
public void setArgs(T args) {
this.args = args;
}
/**
* Checks if we are doing a rootScan.
* @return - RootScan.
*/
public boolean isRootScan() {
return rootScan;
}
/**
* Sets the RootScan property.
* @param rootScan - Boolean.
*/
public void setRootScan(boolean rootScan) {
this.rootScan = rootScan;
}
}

View File

@ -198,41 +198,74 @@ public class VolumeHandler implements Volume {
}
/**
* Returns Volume info. This API can be invoked either
* by admin or the owner
*
* @param volume - Storage Volume Name
* @param req - Http Req
* @param uriInfo - http URI
* @param headers - Http headers @return - Response
* Returns Volume info. This API can be invoked either by admin or the owner
*
* @param volume - Storage Volume Name
* @param info - Info attribute
* @param prefix - Prefix key
* @param maxKeys - Max results
* @param prevKey - PrevKey
* @param req - Http Req
* @param uriInfo - UriInfo.
* @param headers - Http headers
* @return
* @throws OzoneException
*/
@Override
public Response getVolumeInfo(String volume, final String info, Request req,
public Response getVolumeInfo(String volume, final String info,
final String prefix,
final int maxKeys,
final String prevKey,
final boolean rootScan,
Request req,
final UriInfo uriInfo, HttpHeaders headers)
throws OzoneException {
MDC.put(OZONE_FUNCTION, "getVolumeInfo");
return new VolumeProcessTemplate() {
@Override
public Response doProcess(VolumeArgs args)
throws IOException, OzoneException {
switch (info) {
case Header.OZONE_LIST_QUERY_BUCKET:
return getBucketsInVolume(args); // Return list of Buckets
case Header.OZONE_LIST_QUERY_VOLUME:
return getVolumeInfoResponse(args); // Return volume info
case Header.OZONE_LIST_QUERY_SERVICE:
return getVolumesByUser(args); // Return list of volumes
default:
LOG.debug("Unrecognized query param : {} ", info);
OzoneException ozoneException =
ErrorTable.newError(ErrorTable.INVALID_QUERY_PARAM, args);
ozoneException.setMessage("Unrecognized query param : " + info);
throw ozoneException;
case Header.OZONE_LIST_QUERY_BUCKET:
MDC.put(OZONE_FUNCTION, "ListBucket");
return getBucketsInVolume(args, prefix, maxKeys, prevKey);
case Header.OZONE_LIST_QUERY_VOLUME:
MDC.put(OZONE_FUNCTION, "InfoVolume");
assertNoListParamPresent(uriInfo, args);
return getVolumeInfoResponse(args); // Return volume info
case Header.OZONE_LIST_QUERY_SERVICE:
MDC.put(OZONE_FUNCTION, "ListVolume");
return getVolumesByUser(args, prefix, maxKeys, prevKey, rootScan);
default:
LOG.debug("Unrecognized query param : {} ", info);
OzoneException ozoneException =
ErrorTable.newError(ErrorTable.INVALID_QUERY_PARAM, args);
ozoneException.setMessage("Unrecognized query param : " + info);
throw ozoneException;
}
}
}.handleCall(volume, req, uriInfo, headers);
}
/**
* Asserts no list query param is present during this call.
*
* @param uriInfo - UriInfo. - UriInfo
* @param args - Volume Args - VolumeArgs.
* @throws OzoneException
*/
private void assertNoListParamPresent(final UriInfo uriInfo, VolumeArgs
args) throws
OzoneException {
String prefix = uriInfo.getQueryParameters().getFirst("prefix");
String maxKeys = uriInfo.getQueryParameters().getFirst("max_keys");
String prevKey = uriInfo.getQueryParameters().getFirst("prev_key");
if ((prefix != null && !prefix.equals(Header.OZONE_EMPTY_STRING)) ||
(maxKeys != null && !maxKeys.equals(Header.OZONE_DEFAULT_LIST_SIZE)) ||
(prevKey != null && !prevKey.equals(Header.OZONE_EMPTY_STRING))) {
throw ErrorTable.newError(ErrorTable.INVALID_QUERY_PARAM, args);
}
}
}

View File

@ -197,76 +197,68 @@ public abstract class VolumeProcessTemplate {
return OzoneUtils.getResponse(args, HTTP_OK, info.toJsonString());
}
/**
* Returns all the volumes belonging to a user.
*
* @param user - userArgs
*
* @return - Response
*
* @throws OzoneException
* @throws IOException
*/
Response getVolumesByUser(UserArgs user) throws OzoneException, IOException {
StorageHandler fs = StorageHandlerBuilder.getStorageHandler();
ListVolumes volumes = fs.listVolumes(user);
return OzoneUtils.getResponse(user, HTTP_OK, volumes.toJsonString());
}
Response getVolumesByUser(UserArgs user, String prefix, int maxKeys,
String prevKey, boolean rootScan) throws OzoneException, IOException {
/**
* This call can also be invoked by Admins of the system where they can
* get the list of buckets of any user.
*
* User makes a call like
* GET / HTTP/1.1
* Host: ozone.self
*
* @param args - volumeArgs
*
* @return Response - A list of buckets owned this user
*
* @throws OzoneException
*/
Response getVolumesByUser(VolumeArgs args) throws OzoneException {
String validatedUser = args.getUserName();
String validatedUser = user.getUserName();
try {
UserAuth auth = UserHandlerBuilder.getAuthHandler();
if (auth.isAdmin(args)) {
validatedUser = auth.getOzoneUser(args);
if(rootScan && !auth.isAdmin(user)) {
throw ErrorTable.newError(ErrorTable.UNAUTHORIZED, user);
}
if (auth.isAdmin(user)) {
validatedUser = auth.getOzoneUser(user);
if (validatedUser == null) {
validatedUser = auth.getUser(args);
validatedUser = auth.getUser(user);
}
}
UserArgs user =
new UserArgs(validatedUser, args.getRequestID(), args.getHostName(),
args.getRequest(), args.getUri(), args.getHeaders());
return getVolumesByUser(user);
UserArgs onBehalfOf =
new UserArgs(validatedUser, user.getRequestID(), user.getHostName(),
user.getRequest(), user.getUri(), user.getHeaders());
StorageHandler fs = StorageHandlerBuilder.getStorageHandler();
ListArgs<UserArgs> listArgs = new ListArgs<>(onBehalfOf, prefix,
maxKeys, prevKey);
listArgs.setRootScan(rootScan);
ListVolumes volumes = fs.listVolumes(listArgs);
return OzoneUtils.getResponse(user, HTTP_OK, volumes.toJsonString());
} catch (IOException ex) {
LOG.debug("unable to get the volume list for the user. Ex: {}", ex);
OzoneException exp = ErrorTable.newError(ErrorTable.SERVER_ERROR,
args, ex);
user, ex);
exp.setMessage("unable to get the volume list for the user");
throw exp;
}
}
/**
* Returns a list of Buckets in a Volume.
*
* @param args - VolumeArgs
* @param prefix - Prefix to Match
* @param maxKeys - Max results to return.
* @param prevKey - PrevKey
* @return List of Buckets
*
* @throws OzoneException
*/
Response getBucketsInVolume(VolumeArgs args) throws OzoneException {
Response getBucketsInVolume(VolumeArgs args, String prefix, int maxKeys,
String prevKey) throws OzoneException {
try {
// UserAuth auth = UserHandlerBuilder.getAuthHandler();
// TODO : Check ACLS.
StorageHandler fs = StorageHandlerBuilder.getStorageHandler();
ListBuckets bucketList = fs.listBuckets(args);
ListArgs<VolumeArgs> listArgs = new ListArgs<>(args, prefix,
maxKeys, prevKey);
ListBuckets bucketList = fs.listBuckets(listArgs);
return OzoneUtils.getResponse(args, HTTP_OK, bucketList.toJsonString());
} catch (IOException ex) {
LOG.debug("unable to get the bucket list for the specified volume." +

View File

@ -31,6 +31,8 @@ public final class Header {
public static final String OZONE_QUOTA_TB = "TB";
public static final String OZONE_QUOTA_REMOVE = "remove";
public static final String OZONE_QUOTA_UNDEFINED = "undefined";
public static final String OZONE_EMPTY_STRING="";
public static final String OZONE_DEFAULT_LIST_SIZE = "1000";
public static final String OZONE_USER = "x-ozone-user";
public static final String OZONE_SIMPLE_AUTHENTICATION_SCHEME = "OZONE";
@ -57,8 +59,10 @@ public final class Header {
public static final String OZONE_LIST_QUERY_TAG ="info";
public static final String OZONE_QUOTA_QUERY_TAG ="quota";
public static final String CONTENT_MD5 = "Content-MD5";
public static final String OZONE_LIST_QUERY_PREFIX="prefix";
public static final String OZONE_LIST_QUERY_MAXKEYS="max-keys";
public static final String OZONE_LIST_QUERY_PREVKEY="prev-key";
public static final String OZONE_LIST_QUERY_ROOTSCAN="root-scan";
private Header() {
// Never constructed.

View File

@ -108,7 +108,7 @@ public interface Bucket {
* @param info - Information type needed
* @param prefix - Prefix for the keys to be fetched
* @param maxKeys - MaxNumber of Keys to Return
* @param startPage - Continuation Token
* @param prevKey - Continuation Token
* @param req - Http request
* @param headers - Http headers
*
@ -122,10 +122,14 @@ public interface Bucket {
Response listBucket(@PathParam("volume") String volume,
@PathParam("bucket") String bucket,
@DefaultValue(Header.OZONE_LIST_QUERY_KEY)
@QueryParam("info") String info,
@QueryParam("prefix") String prefix,
@DefaultValue("1000") @QueryParam("max-keys") int maxKeys,
@QueryParam("start-page") String startPage,
@QueryParam(Header.OZONE_LIST_QUERY_TAG)
String info,
@QueryParam(Header.OZONE_LIST_QUERY_PREFIX)
String prefix,
@QueryParam(Header.OZONE_LIST_QUERY_MAXKEYS)
int maxKeys,
@QueryParam(Header.OZONE_LIST_QUERY_PREVKEY)
String prevKey,
@Context Request req, @Context UriInfo uriInfo,
@Context HttpHeaders headers) throws OzoneException;

View File

@ -24,7 +24,6 @@ import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
import org.apache.hadoop.ozone.web.handlers.ListArgs;
import org.apache.hadoop.ozone.web.handlers.UserArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.response.BucketInfo;
import org.apache.hadoop.ozone.web.response.ListBuckets;
@ -98,14 +97,14 @@ public interface StorageHandler {
/**
* Returns the List of Volumes owned by the specific user.
*
* @param args - UserArgs
* @param args - ListArgs
*
* @return - List of Volumes
*
* @throws IOException
* @throws OzoneException
*/
ListVolumes listVolumes(UserArgs args) throws IOException, OzoneException;
ListVolumes listVolumes(ListArgs args) throws IOException, OzoneException;
/**
* Deletes an Empty Volume.
@ -190,13 +189,14 @@ public interface StorageHandler {
/**
* Returns all Buckets of a specified Volume.
*
* @param args --User Args
* @param listArgs -- List Args.
*
* @return ListAllBuckets
*
* @throws OzoneException
*/
ListBuckets listBuckets(VolumeArgs args) throws IOException, OzoneException;
ListBuckets listBuckets(ListArgs listArgs) throws
IOException, OzoneException;
/**

View File

@ -66,7 +66,7 @@ public interface Volume {
@POST
Response createVolume(@PathParam("volume") String volume,
@DefaultValue(Header.OZONE_QUOTA_UNDEFINED)
@QueryParam("quota") String quota,
@QueryParam(Header.OZONE_QUOTA_QUERY_TAG) String quota,
@Context Request req,
@Context UriInfo uriInfo,
@Context HttpHeaders headers)
@ -94,7 +94,7 @@ public interface Volume {
@PUT
Response updateVolume(@PathParam("volume") String volume,
@DefaultValue(Header.OZONE_QUOTA_UNDEFINED)
@QueryParam("quota") String quota,
@QueryParam(Header.OZONE_QUOTA_QUERY_TAG) String quota,
@Context Request req,
@Context UriInfo uriInfo,
@Context HttpHeaders headers)
@ -132,7 +132,16 @@ public interface Volume {
@GET
Response getVolumeInfo(@PathParam("volume") String volume,
@DefaultValue(Header.OZONE_LIST_QUERY_BUCKET)
@QueryParam("info") String info,
@QueryParam(Header.OZONE_LIST_QUERY_TAG)
String info,
@QueryParam(Header.OZONE_LIST_QUERY_PREFIX)
String prefix,
@QueryParam(Header.OZONE_LIST_QUERY_MAXKEYS)
int keys,
@QueryParam(Header.OZONE_LIST_QUERY_PREVKEY)
String prevKey,
@QueryParam(Header.OZONE_LIST_QUERY_ROOTSCAN)
boolean rootScan,
@Context Request req,
@Context UriInfo uriInfo,
@Context HttpHeaders headers)

View File

@ -25,7 +25,6 @@ import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
import org.apache.hadoop.ozone.web.handlers.ListArgs;
import org.apache.hadoop.ozone.web.handlers.UserArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.request.OzoneQuota;
@ -152,12 +151,12 @@ public class LocalStorageHandler implements StorageHandler {
/**
* Returns the List of Volumes owned by the specific user.
*
* @param args - UserArgs
* @param args - ListArgs
* @return - List of Volumes
* @throws IOException
*/
@Override
public ListVolumes listVolumes(UserArgs args)
public ListVolumes listVolumes(ListArgs args)
throws IOException, OzoneException {
OzoneMetadataManager oz =
OzoneMetadataManager.getOzoneMetadataManager(conf);
@ -255,7 +254,7 @@ public class LocalStorageHandler implements StorageHandler {
* @throws OzoneException
*/
@Override
public ListBuckets listBuckets(VolumeArgs args)
public ListBuckets listBuckets(ListArgs args)
throws IOException, OzoneException {
OzoneMetadataManager oz =
OzoneMetadataManager.getOzoneMetadataManager(conf);

View File

@ -17,36 +17,38 @@
*/
package org.apache.hadoop.ozone.web.localstorage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.ozone.web.handlers.ListArgs;
import org.apache.hadoop.ozone.web.response.KeyInfo;
import org.apache.hadoop.ozone.web.response.ListKeys;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import com.google.common.base.Preconditions;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.utils.LevelDBStore;
import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
import org.apache.hadoop.ozone.web.handlers.ListArgs;
import org.apache.hadoop.ozone.web.handlers.UserArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.request.OzoneAcl;
import org.apache.hadoop.ozone.web.response.BucketInfo;
import org.apache.hadoop.ozone.web.response.KeyInfo;
import org.apache.hadoop.ozone.web.response.ListBuckets;
import org.apache.hadoop.ozone.web.response.ListKeys;
import org.apache.hadoop.ozone.web.response.ListVolumes;
import org.apache.hadoop.ozone.web.response.VolumeInfo;
import org.apache.hadoop.ozone.web.response.VolumeOwner;
import org.apache.hadoop.ozone.OzoneConsts;
import org.iq80.leveldb.DBException;
import org.apache.commons.codec.digest.DigestUtils;
import org.iq80.leveldb.DBIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.FileOutputStream;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
@ -375,21 +377,180 @@ public final class OzoneMetadataManager {
* @return - ListVolumes
* @throws OzoneException
*/
public ListVolumes listVolumes(UserArgs args) throws OzoneException {
public ListVolumes listVolumes(ListArgs args) throws OzoneException {
lock.readLock().lock();
try {
byte[] volumeList = userDB.get(args.getUserName().getBytes(encoding));
if (volumeList == null) {
throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args);
Preconditions.checkState(args.getArgs() instanceof UserArgs);
if (args.isRootScan()) {
return listAllVolumes(args);
}
return ListVolumes.parse(new String(volumeList, encoding));
UserArgs uArgs = (UserArgs) args.getArgs();
byte[] volumeList = userDB.get(uArgs.getUserName().getBytes(encoding));
if (volumeList == null) {
throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, uArgs);
}
String prefix = args.getPrefix();
int maxCount = args.getMaxKeys();
String prevKey = args.getPrevKey();
if (prevKey != null) {
// Format is username/volumeName, in local mode we don't use the
// user name since we have a userName DB.
String[] volName = args.getPrevKey().split("/");
if (volName.length < 2) {
throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, uArgs);
}
prevKey = volName[1];
}
return getFilteredVolumes(volumeList, prefix, prevKey, maxCount);
} catch (IOException | DBException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args.getArgs(), ex);
} finally {
lock.readLock().unlock();
}
}
/**
* Returns a List of Volumes that meet the prefix, prevkey and maxCount
* constraints.
*
* @param volumeList - Byte Array of Volume Info.
* @param prefix - prefix string.
* @param prevKey - PrevKey
* @param maxCount - Maximum Count.
* @return ListVolumes.
* @throws IOException
*/
private ListVolumes getFilteredVolumes(byte[] volumeList, String prefix,
String prevKey, int maxCount) throws
IOException {
ListVolumes volumes = ListVolumes.parse(new String(volumeList,
encoding));
int currentCount = 0;
ListIterator<VolumeInfo> iter = volumes.getVolumes().listIterator();
ListVolumes filteredVolumes = new ListVolumes();
while (currentCount < maxCount && iter.hasNext()) {
VolumeInfo vInfo = iter.next();
if (isMatchingPrefix(prefix, vInfo) && isAfterKey(prevKey, vInfo)) {
filteredVolumes.addVolume(vInfo);
currentCount++;
}
}
return filteredVolumes;
}
/**
* Returns all volumes in a cluster.
*
* @param args - ListArgs.
* @return ListVolumes.
* @throws OzoneException
*/
public ListVolumes listAllVolumes(ListArgs args) throws OzoneException,
IOException {
String prefix = args.getPrefix();
String prevKey = args.getPrevKey();
int maxCount = args.getMaxKeys();
String userName = null;
DBIterator iterator = this.userDB.getDB().iterator();
if (prevKey != null) {
// Format is username/volumeName
String[] volName = args.getPrevKey().split("/");
if (volName.length < 2) {
throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs());
}
seekToUser(iterator, volName[0]);
userName = new String(iterator.peekNext().getKey(), encoding);
prevKey = volName[1];
} else {
userName = getFirstUser(iterator);
}
if (userName == null || userName.isEmpty()) {
throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs());
}
ListVolumes returnSet = new ListVolumes();
int count = maxCount - returnSet.getVolumes().size();
// we need to iterate through users until we get maxcount volumes
// or no more volumes are left.
while (iterator.hasNext() && count > 0) {
userName = new String(iterator.next().getKey(), encoding);
byte[] volumeList = userDB.get(userName.getBytes(encoding));
if (volumeList == null) {
throw ErrorTable.newError(ErrorTable.USER_NOT_FOUND, args.getArgs());
}
returnSet.getVolumes().addAll(
getFilteredVolumes(volumeList, prefix, prevKey, count).getVolumes());
count = maxCount - returnSet.getVolumes().size();
}
return returnSet;
}
/**
* Returns the first user name from the UserDB.
*
* @return - UserName.
* @throws IOException
*/
String getFirstUser(DBIterator iterator) throws IOException {
iterator.seekToFirst();
if (iterator.hasNext()) {
return new String(iterator.peekNext().getKey(), encoding);
}
return null;
}
/**
* Reposition the DB cursor to the user name.
*
* @param iterator - Current Iterator.
* @param userName - userName to seek to
* @return - DBIterator.
* @throws IOException
*/
DBIterator seekToUser(DBIterator iterator, String userName) throws
IOException {
iterator.seek(userName.getBytes(encoding));
return iterator;
}
/**
* Checks if a name starts with a matching prefix.
*
* @param prefix - prefix string.
* @param vInfo - volume info.
* @return true or false.
*/
private boolean isMatchingPrefix(String prefix, VolumeInfo vInfo) {
if (prefix == null || prefix.isEmpty()) {
return true;
}
return vInfo.getVolumeName().startsWith(prefix);
}
/**
* Checks if the key is after the prevKey.
*
* @param prevKey - String prevKey.
* @param vInfo - volume Info.
* @return - true or false.
*/
private boolean isAfterKey(String prevKey, VolumeInfo vInfo) {
if (prevKey == null || prevKey.isEmpty()) {
return true;
}
return prevKey.compareTo(vInfo.getVolumeName()) < 0;
}
/**
* Deletes a volume if it exists and is empty.
*
@ -683,26 +844,31 @@ public final class OzoneMetadataManager {
* @return List of buckets
* @throws OzoneException
*/
public ListBuckets listBuckets(VolumeArgs args) throws OzoneException {
public ListBuckets listBuckets(ListArgs args) throws OzoneException {
lock.readLock().lock();
try {
String userVolKey = args.getUserName() + "/" + args.getVolumeName();
Preconditions.checkState(args.getArgs() instanceof VolumeArgs);
VolumeArgs vArgs = (VolumeArgs) args.getArgs();
String userVolKey = vArgs.getUserName() + "/" + vArgs.getVolumeName();
// TODO : Query using Prefix and PrevKey
byte[] bucketBytes = userDB.get(userVolKey.getBytes(encoding));
if (bucketBytes == null) {
throw ErrorTable.newError(ErrorTable.INVALID_VOLUME_NAME, args);
throw ErrorTable.newError(ErrorTable.INVALID_VOLUME_NAME,
args.getArgs());
}
return ListBuckets.parse(new String(bucketBytes, encoding));
} catch (IOException ex) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, ex);
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args.getArgs(), ex);
} finally {
lock.readLock().unlock();
}
}
/**
* Creates a key and returns a stream to which this key can be written to.
* @param args KeyArgs
* Creates a key and returns a stream to which this key can be written to.
*
* @param args KeyArgs
* @return - A stream into which key can be written to.
* @throws OzoneException
*/
@ -721,13 +887,13 @@ public final class OzoneMetadataManager {
// only if the upload is successful.
if (f.exists()) {
LOG.debug("we are overwriting a file. This is by design.");
if(!f.delete()) {
if (!f.delete()) {
LOG.error("Unable to delete the file: {}", fullPath);
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args);
}
}
// f.createNewFile();
// f.createNewFile();
FileOutputStream fsStream = new FileOutputStream(f);
inProgressObjects.put(fsStream, fullPath);
@ -739,14 +905,11 @@ public final class OzoneMetadataManager {
}
}
/**
* commit keys moves an In progress object into the metadata store
* so that key is visible in the metadata operations from that point
* onwards.
* commit keys moves an In progress object into the metadata store so that key
* is visible in the metadata operations from that point onwards.
*
* @param args Object args
*
* @throws OzoneException
*/
public void commitKey(KeyArgs args, OutputStream stream)
@ -813,7 +976,7 @@ public final class OzoneMetadataManager {
keyInfo.toDBString().getBytes(encoding));
metadataDB.put(args.getParentName().getBytes(encoding),
bInfo.toDBString().getBytes(encoding));
bInfo.toDBString().getBytes(encoding));
userDB.put(args.getParentName().getBytes(encoding),
keyList.toDBString().getBytes(encoding));
@ -829,7 +992,6 @@ public final class OzoneMetadataManager {
* deletes an key from a given bucket.
*
* @param args - ObjectArgs
*
* @throws OzoneException
*/
public void deleteKey(KeyArgs args) throws OzoneException {
@ -867,7 +1029,7 @@ public final class OzoneMetadataManager {
File f = new File(fullPath);
if (f.exists()) {
if(!f.delete()) {
if (!f.delete()) {
throw ErrorTable.newError(ErrorTable.KEY_OPERATION_CONFLICT, args);
}
} else {
@ -877,7 +1039,7 @@ public final class OzoneMetadataManager {
metadataDB.delete(args.getResourceName().getBytes(encoding));
metadataDB.put(args.getParentName().getBytes(encoding),
bInfo.toDBString().getBytes(encoding));
bInfo.toDBString().getBytes(encoding));
userDB.put(args.getParentName().getBytes(encoding),
keyList.toDBString().getBytes(encoding));
} catch (IOException e) {
@ -891,9 +1053,7 @@ public final class OzoneMetadataManager {
* Returns a Stream for the file.
*
* @param args - Object args
*
* @return Stream
*
* @throws IOException
* @throws OzoneException
*/
@ -918,24 +1078,28 @@ public final class OzoneMetadataManager {
/**
* Returns keys in a bucket.
*
* @param args
* @return List of keys.
* @return List of keys.
* @throws IOException
* @throws OzoneException
*/
public ListKeys listKeys(ListArgs args) throws IOException, OzoneException {
lock.readLock().lock();
// TODO : Support Prefix and PrevKey lookup.
try {
byte[] bucketInfo = metadataDB.get(args.getResourceName()
Preconditions.checkState(args.getArgs() instanceof BucketArgs);
BucketArgs bArgs = (BucketArgs) args.getArgs();
byte[] bucketInfo = metadataDB.get(bArgs.getResourceName()
.getBytes(encoding));
if (bucketInfo == null) {
throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args);
throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, bArgs);
}
byte[] bucketListBytes = userDB.get(args.getResourceName()
byte[] bucketListBytes = userDB.get(bArgs.getResourceName()
.getBytes(encoding));
if (bucketListBytes == null) {
throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args);
throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, bArgs);
}
return ListKeys.parse(new String(bucketListBytes, encoding));
} finally {

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.ozone.web.response;
import com.google.common.base.Preconditions;
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.ListArgs;
import org.codehaus.jackson.annotate.JsonAutoDetect;
import org.codehaus.jackson.annotate.JsonMethod;
@ -57,7 +59,8 @@ public class ListKeys {
* @param truncated is truncated
*/
public ListKeys(ListArgs args, boolean truncated) {
this.name = args.getBucketName();
Preconditions.checkState(args.getArgs() instanceof BucketArgs);
this.name = ((BucketArgs) args.getArgs()).getBucketName();
this.prefix = args.getPrefix();
this.maxKeys = args.getMaxKeys();
this.truncated = truncated;

View File

@ -48,7 +48,6 @@ import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
import org.apache.hadoop.ozone.web.handlers.ListArgs;
import org.apache.hadoop.ozone.web.handlers.UserArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.response.BucketInfo;
@ -121,7 +120,7 @@ public final class DistributedStorageHandler implements StorageHandler {
}
@Override
public ListVolumes listVolumes(UserArgs args)
public ListVolumes listVolumes(ListArgs args)
throws IOException, OzoneException {
throw new UnsupportedOperationException("listVolumes not implemented");
}
@ -204,7 +203,7 @@ public final class DistributedStorageHandler implements StorageHandler {
}
@Override
public ListBuckets listBuckets(VolumeArgs args)
public ListBuckets listBuckets(ListArgs args)
throws IOException, OzoneException {
throw new UnsupportedOperationException("listBuckets not implemented");
}

View File

@ -209,8 +209,9 @@ message DeleteContainerResponseProto {
message ListContainerRequestProto {
required Pipeline pipeline = 1;
optional string prevKey = 2; // if this is not set query from start.
optional string prefix = 2;
required uint32 count = 3; // Max Results to return
optional string prevKey = 4; // if this is not set query from start.
}
message ListContainerResponseProto {

View File

@ -266,7 +266,7 @@ public class TestContainerPersistence {
String prevKey = "";
List<ContainerData> results = new LinkedList<>();
while (counter < count) {
containerManager.listContainer(prevKey, step, results);
containerManager.listContainer(null, step, prevKey, results);
for (int y = 0; y < results.size(); y++) {
testMap.remove(results.get(y).getContainerName());
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.web.client;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.OzoneConfigKeys;
@ -29,9 +30,11 @@ import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
@ -65,6 +68,7 @@ public class TestVolume {
String path = p.getPath().concat(TestVolume.class.getSimpleName());
path += conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT,
OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT);
FileUtils.deleteDirectory(new File(path));
conf.set(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT, path);
conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_ENABLED_KEY, true);
@ -158,4 +162,55 @@ public class TestVolume {
assertTrue(ovols.size() >= 10);
}
@Test
public void testListVolumePagination() throws OzoneException, IOException {
final int volCount = 2000;
final int step = 100;
client.setUserAuth(OzoneConsts.OZONE_SIMPLE_HDFS_USER);
for (int x = 0; x < volCount; x++) {
String volumeName = OzoneUtils.getRequestID().toLowerCase();
OzoneVolume vol = client.createVolume(volumeName, "frodo", "100TB");
assertNotNull(vol);
}
OzoneVolume prevKey = null;
int count = 0;
int pagecount = 0;
while (count < volCount) {
List<OzoneVolume> ovols = client.listVolumes("frodo", null, step,
prevKey);
count += ovols.size();
prevKey = ovols.get(ovols.size() - 1);
pagecount++;
}
Assert.assertEquals(volCount / step, pagecount);
}
@Test
public void testListAllVolumes() throws OzoneException, IOException {
final int volCount = 200;
final int step = 10;
client.setUserAuth(OzoneConsts.OZONE_SIMPLE_HDFS_USER);
for (int x = 0; x < volCount; x++) {
String userName = "frodo" + x;
String volumeName = "vol"+ x;
OzoneVolume vol = client.createVolume(volumeName, userName, "100TB");
assertNotNull(vol);
}
OzoneVolume prevKey = null;
int count = 0;
int pagecount = 0;
while (count < volCount) {
List<OzoneVolume> ovols = client.listAllVolumes(null, step,
prevKey);
count += ovols.size();
if(ovols.size() > 0) {
prevKey = ovols.get(ovols.size() - 1);
}
pagecount++;
}
// becasue we are querying an existing ozone store, there will
// be volumes created by other tests too. So we should get more page counts.
Assert.assertEquals(volCount / step , pagecount);
}
}