HDFS-10784. Implement WebHdfsFileSystem#listStatusIterator.

(cherry picked from commit 85bab5fb57)
This commit is contained in:
Andrew Wang 2016-08-31 14:29:37 -07:00
parent 72334a4be3
commit 72d90cbe32
10 changed files with 361 additions and 20 deletions

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.web;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.fs.ContentSummary;
@ -33,6 +34,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -141,6 +143,25 @@ class JsonUtilClient {
storagePolicy);
}
static DirectoryListing toDirectoryListing(final Map<?, ?> json) {
if (json == null) {
return null;
}
final List<?> list = JsonUtilClient.getList(json,
"partialListing");
HdfsFileStatus[] partialListing = new HdfsFileStatus[list.size()];
int i = 0;
for (Object o : list) {
final Map<?, ?> m = (Map<?, ?>) o;
partialListing[i++] = toFileStatus(m, false);
}
int remainingEntries = getInt(json, "remainingEntries", -1);
Preconditions.checkState(remainingEntries != -1,
"remainingEntries was not set");
return new DirectoryListing(partialListing, remainingEntries);
}
/** Convert a Json map to an ExtendedBlock object. */
static ExtendedBlock toExtendedBlock(final Map<?, ?> m) {
if (m == null) {

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
@ -79,6 +80,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HAUtilClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@ -106,6 +108,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@ -1494,6 +1497,58 @@ public class WebHdfsFileSystem extends FileSystem
}.run();
}
private static final byte[] EMPTY_ARRAY = new byte[] {};
private class DirListingIterator<T extends FileStatus> implements
RemoteIterator<T> {
private final Path path;
private DirectoryListing thisListing;
private int i = 0;
private byte[] prevKey = EMPTY_ARRAY;
DirListingIterator(Path path) {
this.path = path;
}
@Override
public boolean hasNext() throws IOException {
if (thisListing == null) {
fetchMore();
}
return i < thisListing.getPartialListing().length ||
thisListing.hasMore();
}
private void fetchMore() throws IOException {
thisListing = new FsPathResponseRunner<DirectoryListing>(
GetOpParam.Op.LISTSTATUS_BATCH,
path, new StartAfterParam(new String(prevKey, Charsets.UTF_8))) {
@Override
DirectoryListing decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtilClient.toDirectoryListing(json);
}
}.run();
i = 0;
prevKey = thisListing.getLastName();
}
@Override
@SuppressWarnings("unchecked")
public T next() throws IOException {
Preconditions.checkState(hasNext(), "No more items in iterator");
if (i == thisListing.getPartialListing().length) {
fetchMore();
}
return (T)makeQualified(thisListing.getPartialListing()[i++], path);
}
}
@Override
public RemoteIterator<FileStatus> listStatusIterator(final Path f)
throws FileNotFoundException, IOException {
return new DirListingIterator<>(f);
}
@Override
public Token<DelegationTokenIdentifier> getDelegationToken(
final String renewer) throws IOException {

View File

@ -41,7 +41,8 @@ public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED),
CHECKACCESS(false, HttpURLConnection.HTTP_OK);
CHECKACCESS(false, HttpURLConnection.HTTP_OK),
LISTSTATUS_BATCH(false, HttpURLConnection.HTTP_OK);
final boolean redirect;
final int expectedHttpResponseCode;

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web.resources;
/**
* Used during batched ListStatus operations.
*/
public class StartAfterParam extends StringParam {
public static final String NAME = "startafter";
public static final String DEFAULT = "";
private static final Domain DOMAIN = new Domain(NAME, null);
public StartAfterParam(final String str) {
super(DOMAIN, str != null && !str.equals(DEFAULT) ? str : null);
}
@Override
public String getName() {
return NAME;
}
}

View File

@ -779,11 +779,13 @@ public class NamenodeWebHdfsMethods {
@QueryParam(TokenServiceParam.NAME) @DefaultValue(TokenServiceParam.DEFAULT)
final TokenServiceParam tokenService,
@QueryParam(NoRedirectParam.NAME) @DefaultValue(NoRedirectParam.DEFAULT)
final NoRedirectParam noredirect
final NoRedirectParam noredirect,
@QueryParam(StartAfterParam.NAME) @DefaultValue(StartAfterParam.DEFAULT)
final StartAfterParam startAfter
) throws IOException, InterruptedException {
return get(ugi, delegation, username, doAsUser, ROOT, op, offset, length,
renewer, bufferSize, xattrNames, xattrEncoding, excludeDatanodes, fsAction,
tokenKind, tokenService, noredirect);
tokenKind, tokenService, noredirect, startAfter);
}
/** Handle HTTP GET request. */
@ -822,12 +824,14 @@ public class NamenodeWebHdfsMethods {
@QueryParam(TokenServiceParam.NAME) @DefaultValue(TokenServiceParam.DEFAULT)
final TokenServiceParam tokenService,
@QueryParam(NoRedirectParam.NAME) @DefaultValue(NoRedirectParam.DEFAULT)
final NoRedirectParam noredirect
final NoRedirectParam noredirect,
@QueryParam(StartAfterParam.NAME) @DefaultValue(StartAfterParam.DEFAULT)
final StartAfterParam startAfter
) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, offset, length,
renewer, bufferSize, xattrEncoding, excludeDatanodes, fsAction,
tokenKind, tokenService);
tokenKind, tokenService, startAfter);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
@ -836,7 +840,7 @@ public class NamenodeWebHdfsMethods {
return get(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, offset, length, renewer, bufferSize,
xattrNames, xattrEncoding, excludeDatanodes, fsAction, tokenKind,
tokenService, noredirect);
tokenService, noredirect, startAfter);
} finally {
reset();
}
@ -861,7 +865,8 @@ public class NamenodeWebHdfsMethods {
final FsActionParam fsAction,
final TokenKindParam tokenKind,
final TokenServiceParam tokenService,
final NoRedirectParam noredirectParam
final NoRedirectParam noredirectParam,
final StartAfterParam startAfter
) throws IOException, URISyntaxException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final Configuration conf = (Configuration) context
@ -984,6 +989,16 @@ public class NamenodeWebHdfsMethods {
np.checkAccess(fullpath, FsAction.getFsAction(fsAction.getValue()));
return Response.ok().build();
}
case LISTSTATUS_BATCH:
{
byte[] start = HdfsFileStatus.EMPTY_NAME;
if (startAfter != null && startAfter.getValue() != null) {
start = startAfter.getValue().getBytes(Charsets.UTF_8);
}
final DirectoryListing listing = getDirectoryListing(np, fullpath, start);
final String js = JsonUtil.toJsonString(listing);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
default:
throw new UnsupportedOperationException(op + " is not supported");
}

View File

@ -96,6 +96,16 @@ public class JsonUtil {
if (status == null) {
return null;
}
final Map<String, Object> m = toJsonMap(status);
try {
return includeType ?
toJsonString(FileStatus.class, m) : MAPPER.writeValueAsString(m);
} catch (IOException ignored) {
}
return null;
}
private static Map<String, Object> toJsonMap(HdfsFileStatus status) {
final Map<String, Object> m = new TreeMap<String, Object>();
m.put("pathSuffix", status.getLocalName());
m.put("type", WebHdfsConstants.PathType.valueOf(status));
@ -121,12 +131,7 @@ public class JsonUtil {
m.put("fileId", status.getFileId());
m.put("childrenNum", status.getChildrenNum());
m.put("storagePolicy", status.getStoragePolicy());
try {
return includeType ?
toJsonString(FileStatus.class, m) : MAPPER.writeValueAsString(m);
} catch (IOException ignored) {
}
return null;
return m;
}
/** Convert an ExtendedBlock to a Json map. */
@ -227,6 +232,34 @@ public class JsonUtil {
return m;
}
public static String toJsonString(final DirectoryListing listing) throws
IOException {
if (listing == null) {
return null;
}
final Map<String, Object> m = new TreeMap<>();
m.put("partialListing", toJsonArray(listing.getPartialListing()));
m.put("remainingEntries", listing.getRemainingEntries());
return MAPPER.writeValueAsString(m);
}
private static Object[] toJsonArray(HdfsFileStatus[] statuses) throws
IOException {
if (statuses == null) {
return null;
}
if (statuses.length == 0) {
return EMPTY_OBJECT_ARRAY;
}
final Object[] a = new Object[statuses.length];
for (int i = 0; i < statuses.length; i++) {
a[i] = toJsonMap(statuses[i]);
}
return a;
}
/** Convert a LocatedBlock[] to a Json array. */
private static Object[] toJsonArray(final List<LocatedBlock> array
) throws IOException {

View File

@ -37,6 +37,7 @@ WebHDFS REST API
* [Truncate a File](#Truncate_a_File)
* [Status of a File/Directory](#Status_of_a_FileDirectory)
* [List a Directory](#List_a_Directory)
* [Iteratively List a Directory](#Iteratively_List_a_Directory)
* [Other File System Operations](#Other_File_System_Operations)
* [Get Content Summary of a Directory](#Get_Content_Summary_of_a_Directory)
* [Get File Checksum](#Get_File_Checksum)
@ -143,6 +144,7 @@ The HTTP REST API supports the complete [FileSystem](../../api/org/apache/hadoop
* [`OPEN`](#Open_and_Read_a_File) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).open)
* [`GETFILESTATUS`](#Status_of_a_FileDirectory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileStatus)
* [`LISTSTATUS`](#List_a_Directory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).listStatus)
* [`LISTSTATUS_BATCH`](#Iteratively_List_a_Directory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).listStatusIterator)
* [`GETCONTENTSUMMARY`](#Get_Content_Summary_of_a_Directory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getContentSummary)
* [`GETFILECHECKSUM`](#Get_File_Checksum) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileChecksum)
* [`GETHOMEDIRECTORY`](#Get_Home_Directory) (see [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getHomeDirectory)
@ -590,6 +592,109 @@ See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).getFileSt
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).listStatus
### Iteratively List a Directory
* Submit a HTTP GET request.
curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS_BATCH&startAfter=<CHILD>"
The client receives a response with a batch of [`FileStatuses` JSON object](#FileStatuses_JSON_Schema), as well as iteration information:
HTTP/1.1 200 OK
Cache-Control: no-cache
Expires: Tue, 30 Aug 2016 16:42:16 GMT
Date: Tue, 30 Aug 2016 16:42:16 GMT
Pragma: no-cache
Expires: Tue, 30 Aug 2016 16:42:16 GMT
Date: Tue, 30 Aug 2016 16:42:16 GMT
Pragma: no-cache
Content-Type: application/json
X-FRAME-OPTIONS: SAMEORIGIN
Transfer-Encoding: chunked
Server: Jetty(6.1.26)
{
"partialListing": [
{
"accessTime": 0,
"blockSize": 0,
"childrenNum": 0,
"fileId": 16389,
"group": "supergroup",
"length": 0,
"modificationTime": 1472575493064,
"owner": "andrew",
"pathSuffix": "anotherdir",
"permission": "755",
"replication": 0,
"storagePolicy": 0,
"type": "DIRECTORY"
},
{
"accessTime": 0,
"blockSize": 0,
"childrenNum": 0,
"fileId": 16386,
"group": "supergroup",
"length": 0,
"modificationTime": 1472575274776,
"owner": "andrew",
"pathSuffix": "somedir",
"permission": "755",
"replication": 0,
"storagePolicy": 0,
"type": "DIRECTORY"
}
],
"remainingEntries": 1
}
If `remainingEntries` is non-zero, there are additional entries in the directory.
To query the next batch, set the `startAfter` parameter to the `pathSuffix` of the last item returned in the current batch. For example:
curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS_BATCH&startAfter=somedir"
Which will return the next batch of directory entries:
HTTP/1.1 200 OK
Cache-Control: no-cache
Expires: Tue, 30 Aug 2016 16:46:23 GMT
Date: Tue, 30 Aug 2016 16:46:23 GMT
Pragma: no-cache
Expires: Tue, 30 Aug 2016 16:46:23 GMT
Date: Tue, 30 Aug 2016 16:46:23 GMT
Pragma: no-cache
Content-Type: application/json
X-FRAME-OPTIONS: SAMEORIGIN
Transfer-Encoding: chunked
Server: Jetty(6.1.26)
{
"partialListing": [
{
"accessTime": 1472575333568,
"blockSize": 1024,
"childrenNum": 0,
"fileId": 16388,
"group": "supergroup",
"length": 224,
"modificationTime": 1472575334222,
"owner": "andrew",
"pathSuffix": "somefile",
"permission": "644",
"replication": 3,
"storagePolicy": 0,
"type": "FILE"
}
],
"remainingEntries": 0
}
Batch size is controlled by the `dfs.ls.limit` option on the NameNode.
See also: [FileSystem](../../api/org/apache/hadoop/fs/FileSystem.html).listStatusIterator
Other File System Operations
----------------------------

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.web;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -50,6 +51,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
@ -299,11 +301,50 @@ public class TestWebHDFS {
WebHdfsConstants.WEBHDFS_SCHEME);
Path d = new Path("/my-dir");
Assert.assertTrue(fs.mkdirs(d));
for (int i=0; i < listLimit*3; i++) {
Path p = new Path(d, "file-"+i);
// Iterator should have no items when dir is empty
RemoteIterator<FileStatus> it = fs.listStatusIterator(d);
assertFalse(it.hasNext());
Path p = new Path(d, "file-"+0);
Assert.assertTrue(fs.createNewFile(p));
// Iterator should have an item when dir is not empty
it = fs.listStatusIterator(d);
assertTrue(it.hasNext());
it.next();
assertFalse(it.hasNext());
for (int i=1; i < listLimit*3; i++) {
p = new Path(d, "file-"+i);
Assert.assertTrue(fs.createNewFile(p));
}
Assert.assertEquals(listLimit*3, fs.listStatus(d).length);
// Check the FileStatus[] listing
FileStatus[] statuses = fs.listStatus(d);
Assert.assertEquals(listLimit*3, statuses.length);
// Check the iterator-based listing
GenericTestUtils.setLogLevel(WebHdfsFileSystem.LOG, Level.TRACE);
GenericTestUtils.setLogLevel(NamenodeWebHdfsMethods.LOG, Level
.TRACE);
it = fs.listStatusIterator(d);
int count = 0;
while (it.hasNext()) {
FileStatus stat = it.next();
assertEquals("FileStatuses not equal", statuses[count], stat);
count++;
}
assertEquals("Different # of statuses!", statuses.length, count);
// Do some more basic iterator tests
it = fs.listStatusIterator(d);
// Try advancing the iterator without calling hasNext()
for (int i = 0; i < statuses.length; i++) {
FileStatus stat = it.next();
assertEquals("FileStatuses not equal", statuses[i], stat);
}
assertFalse("No more items expected", it.hasNext());
// Try doing next when out of items
try {
it.next();
fail("Iterator should error if out of elements.");
} catch (IllegalStateException e) {
// pass
}
return null;
}
});
@ -446,9 +487,9 @@ public class TestWebHDFS {
// delete the two snapshots
webHdfs.deleteSnapshot(foo, "s1");
Assert.assertFalse(webHdfs.exists(s1path));
assertFalse(webHdfs.exists(s1path));
webHdfs.deleteSnapshot(foo, spath.getName());
Assert.assertFalse(webHdfs.exists(spath));
assertFalse(webHdfs.exists(spath));
} finally {
if (cluster != null) {
cluster.shutdown();
@ -504,12 +545,12 @@ public class TestWebHDFS {
// rename s1 to s2
webHdfs.renameSnapshot(foo, "s1", "s2");
Assert.assertFalse(webHdfs.exists(s1path));
assertFalse(webHdfs.exists(s1path));
final Path s2path = SnapshotTestHelper.getSnapshotRoot(foo, "s2");
Assert.assertTrue(webHdfs.exists(s2path));
webHdfs.deleteSnapshot(foo, "s2");
Assert.assertFalse(webHdfs.exists(s2path));
assertFalse(webHdfs.exists(s2path));
} finally {
if (cluster != null) {
cluster.shutdown();

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.DoAsParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
import org.apache.hadoop.hdfs.web.resources.StartAfterParam;
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
import org.apache.hadoop.hdfs.web.resources.UserParam;
import org.apache.hadoop.hdfs.web.resources.FsActionParam;
@ -306,6 +307,30 @@ public class TestWebHdfsUrl {
},
checkAccessUrl);
}
@Test(timeout=60000)
public void testBatchedListingUrl() throws Exception {
Configuration conf = new Configuration();
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser("test-user");
UserGroupInformation.setLoginUser(ugi);
WebHdfsFileSystem webhdfs = getWebHdfsFileSystem(ugi, conf);
Path fsPath = new Path("/p1");
final StartAfterParam startAfter =
new StartAfterParam("last");
URL url = webhdfs.toUrl(GetOpParam.Op.LISTSTATUS_BATCH,
fsPath, startAfter);
checkQueryParams(
new String[]{
GetOpParam.Op.LISTSTATUS_BATCH.toQueryString(),
new UserParam(ugi.getShortUserName()).toString(),
StartAfterParam.NAME + "=" + "last"
},
url);
}
private void checkQueryParams(String[] expected, URL url) {
Arrays.sort(expected);

View File

@ -453,4 +453,11 @@ public class TestParam {
LOG.info("EXPECTED: " + e);
}
}
@Test
public void testStartAfterParam() throws Exception {
String s = "/helloWorld";
StartAfterParam param = new StartAfterParam(s);
Assert.assertEquals(s, param.getValue());
}
}