HDDS-659. Implement pagination in GET bucket (object list) endpoint. Contributed by Bharat Viswanadham.

This commit is contained in:
Márton Elek 2018-10-31 12:21:38 +01:00
parent ecac351aac
commit b519f3f2a0
6 changed files with 400 additions and 39 deletions

View File

@ -36,7 +36,6 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.time.Instant; import java.time.Instant;
import java.util.Iterator; import java.util.Iterator;
import javax.ws.rs.core.Response.ResponseBuilder;
import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneKey; import org.apache.hadoop.ozone.client.OzoneKey;
@ -48,10 +47,13 @@ import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable; import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.ozone.s3.util.S3utils;
import org.apache.http.HttpStatus; import org.apache.http.HttpStatus;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.hadoop.ozone.s3.util.S3Consts.ENCODING_TYPE;
/** /**
* Bucket level rest endpoints. * Bucket level rest endpoints.
*/ */
@ -76,6 +78,8 @@ public class BucketEndpoint extends EndpointBase {
@DefaultValue("1000") @QueryParam("max-keys") int maxKeys, @DefaultValue("1000") @QueryParam("max-keys") int maxKeys,
@QueryParam("prefix") String prefix, @QueryParam("prefix") String prefix,
@QueryParam("browser") String browser, @QueryParam("browser") String browser,
@QueryParam("continuation-token") String continueToken,
@QueryParam("start-after") String startAfter,
@Context HttpHeaders hh) throws OS3Exception, IOException { @Context HttpHeaders hh) throws OS3Exception, IOException {
if (browser != null) { if (browser != null) {
@ -87,60 +91,91 @@ public class BucketEndpoint extends EndpointBase {
} }
} }
if (delimiter == null) {
delimiter = "/";
}
if (prefix == null) { if (prefix == null) {
prefix = ""; prefix = "";
} }
OzoneBucket bucket = getBucket(bucketName); OzoneBucket bucket = getBucket(bucketName);
Iterator<? extends OzoneKey> ozoneKeyIterator = bucket.listKeys(prefix); Iterator<? extends OzoneKey> ozoneKeyIterator;
String decodedToken = S3utils.decodeContinueToken(continueToken);
if (startAfter != null && continueToken != null) {
// If continuation token and start after both are provided, then we
// ignore start After
ozoneKeyIterator = bucket.listKeys(prefix, decodedToken);
} else if (startAfter != null && continueToken == null) {
ozoneKeyIterator = bucket.listKeys(prefix, startAfter);
} else if (startAfter == null && continueToken != null){
ozoneKeyIterator = bucket.listKeys(prefix, decodedToken);
} else {
ozoneKeyIterator = bucket.listKeys(prefix);
}
ListObjectResponse response = new ListObjectResponse(); ListObjectResponse response = new ListObjectResponse();
response.setDelimiter(delimiter); response.setDelimiter(delimiter);
response.setName(bucketName); response.setName(bucketName);
response.setPrefix(prefix); response.setPrefix(prefix);
response.setMarker(""); response.setMarker("");
response.setMaxKeys(1000); response.setMaxKeys(maxKeys);
response.setEncodingType("url"); response.setEncodingType(ENCODING_TYPE);
response.setTruncated(false); response.setTruncated(false);
response.setContinueToken(continueToken);
String prevDir = null; String prevDir = null;
String lastKey = null;
int count = 0;
while (ozoneKeyIterator.hasNext()) { while (ozoneKeyIterator.hasNext()) {
OzoneKey next = ozoneKeyIterator.next(); OzoneKey next = ozoneKeyIterator.next();
String relativeKeyName = next.getName().substring(prefix.length()); String relativeKeyName = next.getName().substring(prefix.length());
int depth = int depth = StringUtils.countMatches(relativeKeyName, delimiter);
StringUtils.countMatches(relativeKeyName, delimiter); if (delimiter != null) {
if (depth > 0) {
// means key has multiple delimiters in its value.
// ex: dir/dir1/dir2, where delimiter is "/" and prefix is dir/
String dirName = relativeKeyName.substring(0, relativeKeyName
.indexOf(delimiter));
if (!dirName.equals(prevDir)) {
response.addPrefix(prefix + dirName + delimiter);
prevDir = dirName;
count++;
}
} else if (relativeKeyName.endsWith(delimiter)) {
// means or key is same as prefix with delimiter at end and ends with
// delimiter. ex: dir/, where prefix is dir and delimiter is /
response.addPrefix(relativeKeyName);
count++;
} else {
// means our key is matched with prefix if prefix is given and it
// does not have any common prefix.
addKey(response, next);
count++;
}
} else {
addKey(response, next);
count++;
}
if (prefix.length() > 0 && !prefix.endsWith(delimiter) if (count == maxKeys) {
&& relativeKeyName.length() > 0) { lastKey = next.getName();
response.addPrefix(prefix + "/");
break; break;
} }
if (depth > 0) {
String dirName = relativeKeyName
.substring(0, relativeKeyName.indexOf(delimiter));
if (!dirName.equals(prevDir)) {
response.addPrefix(
prefix + dirName + delimiter);
prevDir = dirName;
}
} else if (relativeKeyName.endsWith(delimiter)) {
response.addPrefix(relativeKeyName);
} else if (relativeKeyName.length() > 0) {
KeyMetadata keyMetadata = new KeyMetadata();
keyMetadata.setKey(next.getName());
keyMetadata.setSize(next.getDataSize());
keyMetadata.setETag("" + next.getModificationTime());
keyMetadata.setStorageClass("STANDARD");
keyMetadata
.setLastModified(Instant.ofEpochMilli(next.getModificationTime()));
response.addKey(keyMetadata);
}
} }
response.setKeyCount(count);
if (count < maxKeys) {
response.setTruncated(false);
} else if(ozoneKeyIterator.hasNext()) {
response.setTruncated(true);
response.setNextToken(S3utils.generateContinueToken(lastKey));
} else {
response.setTruncated(false);
}
response.setKeyCount( response.setKeyCount(
response.getCommonPrefixes().size() + response.getContents().size()); response.getCommonPrefixes().size() + response.getContents().size());
return Response.ok(response).build(); return Response.ok(response).build();
@ -253,4 +288,14 @@ public class BucketEndpoint extends EndpointBase {
} }
return result; return result;
} }
private void addKey(ListObjectResponse response, OzoneKey next) {
KeyMetadata keyMetadata = new KeyMetadata();
keyMetadata.setKey(next.getName());
keyMetadata.setSize(next.getDataSize());
keyMetadata.setETag("" + next.getModificationTime());
keyMetadata.setStorageClass("STANDARD");
keyMetadata.setLastModified(Instant.ofEpochMilli(next.getModificationTime()));
response.addKey(keyMetadata);
}
} }

View File

@ -60,6 +60,12 @@ public class ListObjectResponse {
@XmlElement(name = "IsTruncated") @XmlElement(name = "IsTruncated")
private boolean isTruncated; private boolean isTruncated;
@XmlElement(name = "NextContinuationToken")
private String nextToken;
@XmlElement(name = "continueToken")
private String continueToken;
@XmlElement(name = "Contents") @XmlElement(name = "Contents")
private List<KeyMetadata> contents = new ArrayList<>(); private List<KeyMetadata> contents = new ArrayList<>();
@ -148,6 +154,22 @@ public class ListObjectResponse {
commonPrefixes.add(new CommonPrefix(relativeKeyName)); commonPrefixes.add(new CommonPrefix(relativeKeyName));
} }
public String getNextToken() {
return nextToken;
}
public void setNextToken(String nextToken) {
this.nextToken = nextToken;
}
public String getContinueToken() {
return continueToken;
}
public void setContinueToken(String continueToken) {
this.continueToken = continueToken;
}
public int getKeyCount() { public int getKeyCount() {
return keyCount; return keyCount;
} }

View File

@ -15,5 +15,6 @@ public final class S3Consts {
public static final String COPY_SOURCE_HEADER = "x-amz-copy-source"; public static final String COPY_SOURCE_HEADER = "x-amz-copy-source";
public static final String STORAGE_CLASS_HEADER = "x-amz-storage-class"; public static final String STORAGE_CLASS_HEADER = "x-amz-storage-class";
public static final String ENCODING_TYPE = "url";
} }

View File

@ -0,0 +1,73 @@
package org.apache.hadoop.ozone.s3.util;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.exception.S3ErrorTable;
import java.nio.charset.StandardCharsets;
/**
* Utility class for S3.
*/
public final class S3utils {
private S3utils() {
}
private static final String CONTINUE_TOKEN_SEPERATOR = "-";
/**
* Generate a continuation token which is used in get Bucket.
* @param key
* @return if key is not null return continuation token, else returns null.
*/
public static String generateContinueToken(String key) {
if (key != null) {
byte[] byteData = key.getBytes(StandardCharsets.UTF_8);
String hex = Hex.encodeHexString(byteData);
String digest = DigestUtils.sha256Hex(key);
return hex + CONTINUE_TOKEN_SEPERATOR + digest;
} else {
return null;
}
}
/**
* Decode a continuation token which is used in get Bucket.
* @param key
* @return if key is not null return decoded token, otherwise returns null.
* @throws OS3Exception
*/
public static String decodeContinueToken(String key) throws OS3Exception {
if (key != null) {
int indexSeparator = key.indexOf(CONTINUE_TOKEN_SEPERATOR);
if (indexSeparator == -1) {
throw S3ErrorTable.newError(S3ErrorTable.INVALID_ARGUMENT, key);
}
String hex = key.substring(0, indexSeparator);
String digest = key.substring(indexSeparator + 1);
try {
byte[] actualKeyBytes = Hex.decodeHex(hex);
String digestActualKey = DigestUtils.sha256Hex(actualKeyBytes);
if (digest.equals(digestActualKey)) {
return new String(actualKeyBytes);
} else {
OS3Exception ex = S3ErrorTable.newError(S3ErrorTable
.INVALID_ARGUMENT, key);
ex.setErrorMessage("The continuation token provided is incorrect");
throw ex;
}
} catch (DecoderException ex) {
OS3Exception os3Exception = S3ErrorTable.newError(S3ErrorTable
.INVALID_ARGUMENT, key);
os3Exception.setErrorMessage("The continuation token provided is " +
"incorrect");
throw os3Exception;
}
} else {
return null;
}
}
}

View File

@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationFactor;
@ -116,7 +117,8 @@ public class OzoneBucketStub extends OzoneBucket {
@Override @Override
public Iterator<? extends OzoneKey> listKeys(String keyPrefix) { public Iterator<? extends OzoneKey> listKeys(String keyPrefix) {
return keyDetails.values() Map<String, OzoneKey> sortedKey = new TreeMap<String, OzoneKey>(keyDetails);
return sortedKey.values()
.stream() .stream()
.filter(key -> key.getName().startsWith(keyPrefix)) .filter(key -> key.getName().startsWith(keyPrefix))
.collect(Collectors.toList()) .collect(Collectors.toList())
@ -126,7 +128,8 @@ public class OzoneBucketStub extends OzoneBucket {
@Override @Override
public Iterator<? extends OzoneKey> listKeys(String keyPrefix, public Iterator<? extends OzoneKey> listKeys(String keyPrefix,
String prevKey) { String prevKey) {
return keyDetails.values() Map<String, OzoneKey> sortedKey = new TreeMap<String, OzoneKey>(keyDetails);
return sortedKey.values()
.stream() .stream()
.filter(key -> key.getName().compareTo(prevKey) > 0) .filter(key -> key.getName().compareTo(prevKey) > 0)
.filter(key -> key.getName().startsWith(keyPrefix)) .filter(key -> key.getName().startsWith(keyPrefix))

View File

@ -29,6 +29,8 @@ import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.fail;
/** /**
* Testing basic object list browsing. * Testing basic object list browsing.
*/ */
@ -45,7 +47,7 @@ public class TestBucketGet {
ListObjectResponse getBucketResponse = ListObjectResponse getBucketResponse =
(ListObjectResponse) getBucket (ListObjectResponse) getBucket
.list("b1", "/", null, null, 100, "", null, null) .list("b1", "/", null, null, 100, "", null, null, null, null)
.getEntity(); .getEntity();
Assert.assertEquals(1, getBucketResponse.getCommonPrefixes().size()); Assert.assertEquals(1, getBucketResponse.getCommonPrefixes().size());
@ -68,8 +70,8 @@ public class TestBucketGet {
getBucket.setClient(client); getBucket.setClient(client);
ListObjectResponse getBucketResponse = ListObjectResponse getBucketResponse =
(ListObjectResponse) getBucket (ListObjectResponse) getBucket.list("b1", "/", null, null, 100,
.list("b1", "/", null, null, 100, "dir1", null, null).getEntity(); "dir1", null, null, null, null).getEntity();
Assert.assertEquals(1, getBucketResponse.getCommonPrefixes().size()); Assert.assertEquals(1, getBucketResponse.getCommonPrefixes().size());
Assert.assertEquals("dir1/", Assert.assertEquals("dir1/",
@ -85,13 +87,16 @@ public class TestBucketGet {
BucketEndpoint getBucket = new BucketEndpoint(); BucketEndpoint getBucket = new BucketEndpoint();
OzoneClient ozoneClient = OzoneClient ozoneClient =
createClientWithKeys("dir1/file2", "dir1/dir2/file2"); createClientWithKeys("dir1/file2", "dir1/dir2/file2", "dir1bh/file",
"dir1bha/file2");
getBucket.setClient(ozoneClient); getBucket.setClient(ozoneClient);
ListObjectResponse getBucketResponse = ListObjectResponse getBucketResponse =
(ListObjectResponse) getBucket (ListObjectResponse) getBucket
.list("b1", "/", null, null, 100, "dir1/", null, null).getEntity(); .list("b1", "/", null, null, 100, "dir1/", null, null,
null, null)
.getEntity();
Assert.assertEquals(1, getBucketResponse.getCommonPrefixes().size()); Assert.assertEquals(1, getBucketResponse.getCommonPrefixes().size());
Assert.assertEquals("dir1/dir2/", Assert.assertEquals("dir1/dir2/",
@ -103,6 +108,218 @@ public class TestBucketGet {
} }
@Test
public void listWithPrefixAndDelimiter() throws OS3Exception, IOException {
BucketEndpoint getBucket = new BucketEndpoint();
OzoneClient ozoneClient =
createClientWithKeys("dir1/file2", "dir1/dir2/file2", "dir1bh/file",
"dir1bha/file2", "file2");
getBucket.setClient(ozoneClient);
ListObjectResponse getBucketResponse =
(ListObjectResponse) getBucket.list("b1", "/", null, null, 100,
"dir1", null, null, null, null).getEntity();
Assert.assertEquals(3, getBucketResponse.getCommonPrefixes().size());
}
@Test
public void listWithPrefixAndDelimiter1() throws OS3Exception, IOException {
BucketEndpoint getBucket = new BucketEndpoint();
OzoneClient ozoneClient =
createClientWithKeys("dir1/file2", "dir1/dir2/file2", "dir1bh/file",
"dir1bha/file2", "file2");
getBucket.setClient(ozoneClient);
ListObjectResponse getBucketResponse =
(ListObjectResponse) getBucket.list("b1", "/", null, null, 100,
"", null, null, null, null).getEntity();
Assert.assertEquals(3, getBucketResponse.getCommonPrefixes().size());
Assert.assertEquals("file2", getBucketResponse.getContents().get(0)
.getKey());
}
@Test
public void listWithPrefixAndDelimiter2() throws OS3Exception, IOException {
BucketEndpoint getBucket = new BucketEndpoint();
OzoneClient ozoneClient =
createClientWithKeys("dir1/file2", "dir1/dir2/file2", "dir1bh/file",
"dir1bha/file2", "file2");
getBucket.setClient(ozoneClient);
ListObjectResponse getBucketResponse =
(ListObjectResponse) getBucket.list("b1", "/", null, null, 100,
"dir1bh", null, null, "dir1/dir2/file2", null).getEntity();
Assert.assertEquals(2, getBucketResponse.getCommonPrefixes().size());
}
@Test
public void listWithContinuationToken() throws OS3Exception, IOException {
BucketEndpoint getBucket = new BucketEndpoint();
OzoneClient ozoneClient =
createClientWithKeys("dir1/file2", "dir1/dir2/file2", "dir1bh/file",
"dir1bha/file2", "file2");
getBucket.setClient(ozoneClient);
int maxKeys = 2;
// As we have 5 keys, with max keys 2 we should call list 3 times.
// First time
ListObjectResponse getBucketResponse =
(ListObjectResponse) getBucket.list("b1", null, null, null, maxKeys,
"", null, null, null, null).getEntity();
Assert.assertTrue(getBucketResponse.isTruncated());
Assert.assertTrue(getBucketResponse.getContents().size() == 2);
// 2nd time
String continueToken = getBucketResponse.getNextToken();
getBucketResponse =
(ListObjectResponse) getBucket.list("b1", null, null, null, maxKeys,
"", null, continueToken, null, null).getEntity();
Assert.assertTrue(getBucketResponse.isTruncated());
Assert.assertTrue(getBucketResponse.getContents().size() == 2);
continueToken = getBucketResponse.getNextToken();
//3rd time
getBucketResponse =
(ListObjectResponse) getBucket.list("b1", null, null, null, maxKeys,
"", null, continueToken, null, null).getEntity();
Assert.assertFalse(getBucketResponse.isTruncated());
Assert.assertTrue(getBucketResponse.getContents().size() == 1);
}
@Test
/**
* This test is with prefix and delimiter and verify continuation-token
* behavior.
*/
public void listWithContinuationToken1() throws OS3Exception, IOException {
BucketEndpoint getBucket = new BucketEndpoint();
OzoneClient ozoneClient =
createClientWithKeys("dir1/file1", "dir1bh/file1",
"dir1bha/file1", "dir0/file1", "dir2/file1");
getBucket.setClient(ozoneClient);
int maxKeys = 2;
// As we have 5 keys, with max keys 2 we should call list 3 times.
// First time
ListObjectResponse getBucketResponse =
(ListObjectResponse) getBucket.list("b1", "/", null, null, maxKeys,
"dir", null, null, null, null).getEntity();
Assert.assertTrue(getBucketResponse.isTruncated());
Assert.assertTrue(getBucketResponse.getCommonPrefixes().size() == 2);
// 2nd time
String continueToken = getBucketResponse.getNextToken();
getBucketResponse =
(ListObjectResponse) getBucket.list("b1", "/", null, null, maxKeys,
"dir", null, continueToken, null, null).getEntity();
Assert.assertTrue(getBucketResponse.isTruncated());
Assert.assertTrue(getBucketResponse.getCommonPrefixes().size() == 2);
//3rd time
continueToken = getBucketResponse.getNextToken();
getBucketResponse =
(ListObjectResponse) getBucket.list("b1", "/", null, null, maxKeys,
"dir", null, continueToken, null, null).getEntity();
Assert.assertFalse(getBucketResponse.isTruncated());
Assert.assertTrue(getBucketResponse.getCommonPrefixes().size() == 1);
}
@Test
public void listWithContinuationTokenFail() throws OS3Exception, IOException {
BucketEndpoint getBucket = new BucketEndpoint();
OzoneClient ozoneClient =
createClientWithKeys("dir1/file2", "dir1/dir2/file2", "dir1bh/file",
"dir1bha/file2", "dir1", "dir2", "dir3");
getBucket.setClient(ozoneClient);
try {
ListObjectResponse getBucketResponse =
(ListObjectResponse) getBucket.list("b1", "/", null, null, 2,
"dir", null, "random", null, null).getEntity();
fail("listWithContinuationTokenFail");
} catch (OS3Exception ex) {
Assert.assertEquals("random", ex.getResource());
Assert.assertEquals("Invalid Argument", ex.getErrorMessage());
}
}
@Test
public void testStartAfter() throws IOException, OS3Exception {
BucketEndpoint getBucket = new BucketEndpoint();
OzoneClient ozoneClient =
createClientWithKeys("dir1/file1", "dir1bh/file1",
"dir1bha/file1", "dir0/file1", "dir2/file1");
getBucket.setClient(ozoneClient);
ListObjectResponse getBucketResponse =
(ListObjectResponse) getBucket.list("b1", null, null, null, 1000,
null, null, null, null, null).getEntity();
Assert.assertFalse(getBucketResponse.isTruncated());
Assert.assertTrue(getBucketResponse.getContents().size() == 5);
//As our list output is sorted, after seeking to startAfter, we shall
// have 4 keys.
String startAfter = "dir0/file1";
getBucketResponse =
(ListObjectResponse) getBucket.list("b1", null, null, null,
1000, null, null, null, startAfter, null).getEntity();
Assert.assertFalse(getBucketResponse.isTruncated());
Assert.assertTrue(getBucketResponse.getContents().size() == 4);
getBucketResponse =
(ListObjectResponse) getBucket.list("b1", null, null, null,
1000, null, null, null, "random", null).getEntity();
Assert.assertFalse(getBucketResponse.isTruncated());
Assert.assertTrue(getBucketResponse.getContents().size() == 0);
}
private OzoneClient createClientWithKeys(String... keys) throws IOException { private OzoneClient createClientWithKeys(String... keys) throws IOException {
OzoneClient client = new OzoneClientStub(); OzoneClient client = new OzoneClientStub();