diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 52b94845264..6b068025427 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.core.Tracer;
import org.apache.htrace.core.TraceScope;
+import com.google.common.base.Preconditions;
import com.google.common.annotations.VisibleForTesting;
import static com.google.common.base.Preconditions.checkArgument;
@@ -1529,7 +1530,68 @@ public abstract class FileSystem extends Configured implements Closeable {
*/
public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException;
-
+
+ /**
+ * Represents a batch of directory entries when iteratively listing a
+ * directory. This is a private API not meant for use by end users.
+ *
+ * For internal use by FileSystem subclasses that override
+ * {@link FileSystem#listStatusBatch(Path, byte[])} to implement iterative
+ * listing.
+ */
+ @InterfaceAudience.Private
+ public static class DirectoryEntries {
+ private final FileStatus[] entries;
+ private final byte[] token;
+ private final boolean hasMore;
+
+ public DirectoryEntries(FileStatus[] entries, byte[] token, boolean
+ hasMore) {
+ this.entries = entries;
+ if (token != null) {
+ this.token = token.clone();
+ } else {
+ this.token = null;
+ }
+ this.hasMore = hasMore;
+ }
+
+ public FileStatus[] getEntries() {
+ return entries;
+ }
+
+ public byte[] getToken() {
+ return token;
+ }
+
+ public boolean hasMore() {
+ return hasMore;
+ }
+ }
+
+ /**
+ * Given an opaque iteration token, return the next batch of entries in a
+ * directory. This is a private API not meant for use by end users.
+ *
+ * This method should be overridden by FileSystem subclasses that want to
+ * use the generic {@link FileSystem#listStatusIterator(Path)} implementation.
+ * @param f Path to list
+ * @param token opaque iteration token returned by previous call, or null
+ * if this is the first call.
+ * @return
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ @InterfaceAudience.Private
+ protected DirectoryEntries listStatusBatch(Path f, byte[] token) throws
+ FileNotFoundException, IOException {
+ // The default implementation returns the entire listing as a single batch.
+ // Thus, there is never a second batch, and no need to respect the passed
+ // token or set a token in the returned DirectoryEntries.
+ FileStatus[] listing = listStatus(f);
+ return new DirectoryEntries(listing, null, false);
+ }
+
/*
* Filter files/directories in the given path using the user-supplied path
* filter. Results are added to the given array results
.
@@ -1766,6 +1828,49 @@ public abstract class FileSystem extends Configured implements Closeable {
};
}
+ /**
+ * Generic iterator for implementing {@link #listStatusIterator(Path)}.
+ */
+ private class DirListingIterator implements
+ RemoteIterator {
+
+ private final Path path;
+ private DirectoryEntries entries;
+ private int i = 0;
+
+ DirListingIterator(Path path) {
+ this.path = path;
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ if (entries == null) {
+ fetchMore();
+ }
+ return i < entries.getEntries().length ||
+ entries.hasMore();
+ }
+
+ private void fetchMore() throws IOException {
+ byte[] token = null;
+ if (entries != null) {
+ token = entries.getToken();
+ }
+ entries = listStatusBatch(path, token);
+ i = 0;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public T next() throws IOException {
+ Preconditions.checkState(hasNext(), "No more items in iterator");
+ if (i == entries.getEntries().length) {
+ fetchMore();
+ }
+ return (T)entries.getEntries()[i++];
+ }
+ }
+
/**
* Returns a remote iterator so that followup calls are made on demand
* while consuming the entries. Each file system implementation should
@@ -1779,23 +1884,7 @@ public abstract class FileSystem extends Configured implements Closeable {
*/
public RemoteIterator listStatusIterator(final Path p)
throws FileNotFoundException, IOException {
- return new RemoteIterator() {
- private final FileStatus[] stats = listStatus(p);
- private int i = 0;
-
- @Override
- public boolean hasNext() {
- return i(p);
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
index 1282af94adb..76edf5e9233 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
@@ -103,6 +103,7 @@ public class TestFilterFileSystem {
public void processDeleteOnExit();
public FsStatus getStatus();
public FileStatus[] listStatus(Path f, PathFilter filter);
+ public FileStatus[] listStatusBatch(Path f, byte[] token);
public FileStatus[] listStatus(Path[] files);
public FileStatus[] listStatus(Path[] files, PathFilter filter);
public FileStatus[] globStatus(Path pathPattern);
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
index d2020b9b72d..bacdbb73e45 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
@@ -115,6 +115,7 @@ public class TestHarFileSystem {
public QuotaUsage getQuotaUsage(Path f);
public FsStatus getStatus();
public FileStatus[] listStatus(Path f, PathFilter filter);
+ public FileStatus[] listStatusBatch(Path f, byte[] token);
public FileStatus[] listStatus(Path[] files);
public FileStatus[] listStatus(Path[] files, PathFilter filter);
public FileStatus[] globStatus(Path pathPattern);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
index ad7b5cf3a57..92e6901c91e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
@@ -64,7 +64,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
@@ -1491,55 +1490,30 @@ public class WebHdfsFileSystem extends FileSystem
}
private static final byte[] EMPTY_ARRAY = new byte[] {};
- private class DirListingIterator implements
- RemoteIterator {
-
- private final Path path;
- private DirectoryListing thisListing;
- private int i = 0;
- private byte[] prevKey = EMPTY_ARRAY;
-
- DirListingIterator(Path path) {
- this.path = path;
- }
-
- @Override
- public boolean hasNext() throws IOException {
- if (thisListing == null) {
- fetchMore();
- }
- return i < thisListing.getPartialListing().length ||
- thisListing.hasMore();
- }
-
- private void fetchMore() throws IOException {
- thisListing = new FsPathResponseRunner(
- GetOpParam.Op.LISTSTATUS_BATCH,
- path, new StartAfterParam(new String(prevKey, Charsets.UTF_8))) {
- @Override
- DirectoryListing decodeResponse(Map, ?> json) throws IOException {
- return JsonUtilClient.toDirectoryListing(json);
- }
- }.run();
- i = 0;
- prevKey = thisListing.getLastName();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public T next() throws IOException {
- Preconditions.checkState(hasNext(), "No more items in iterator");
- if (i == thisListing.getPartialListing().length) {
- fetchMore();
- }
- return (T)makeQualified(thisListing.getPartialListing()[i++], path);
- }
- }
@Override
- public RemoteIterator listStatusIterator(final Path f)
- throws FileNotFoundException, IOException {
- return new DirListingIterator<>(f);
+ public DirectoryEntries listStatusBatch(Path f, byte[] token) throws
+ FileNotFoundException, IOException {
+ byte[] prevKey = EMPTY_ARRAY;
+ if (token != null) {
+ prevKey = token;
+ }
+ DirectoryListing listing = new FsPathResponseRunner(
+ GetOpParam.Op.LISTSTATUS_BATCH,
+ f, new StartAfterParam(new String(prevKey, Charsets.UTF_8))) {
+ @Override
+ DirectoryListing decodeResponse(Map, ?> json) throws IOException {
+ return JsonUtilClient.toDirectoryListing(json);
+ }
+ }.run();
+ // Qualify the returned FileStatus array
+ final HdfsFileStatus[] statuses = listing.getPartialListing();
+ FileStatus[] qualified = new FileStatus[statuses.length];
+ for (int i = 0; i < statuses.length; i++) {
+ qualified[i] = makeQualified(statuses[i], f);
+ }
+ return new DirectoryEntries(qualified, listing.getLastName(),
+ listing.hasMore());
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java
index d820ec5cf86..74d19a299d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSFileSystem.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.fs.http.client;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
+
+import com.google.common.base.Charsets;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
@@ -112,6 +114,7 @@ public class HttpFSFileSystem extends FileSystem
public static final String XATTR_SET_FLAG_PARAM = "flag";
public static final String XATTR_ENCODING_PARAM = "encoding";
public static final String NEW_LENGTH_PARAM = "newlength";
+ public static final String START_AFTER_PARAM = "startAfter";
public static final Short DEFAULT_PERMISSION = 0755;
public static final String ACLSPEC_DEFAULT = "";
@@ -185,6 +188,10 @@ public class HttpFSFileSystem extends FileSystem
public static final String ENC_BIT_JSON = "encBit";
+ public static final String DIRECTORY_LISTING_JSON = "DirectoryListing";
+ public static final String PARTIAL_LISTING_JSON = "partialListing";
+ public static final String REMAINING_ENTRIES_JSON = "remainingEntries";
+
public static final int HTTP_TEMPORARY_REDIRECT = 307;
private static final String HTTP_GET = "GET";
@@ -204,7 +211,7 @@ public class HttpFSFileSystem extends FileSystem
MODIFYACLENTRIES(HTTP_PUT), REMOVEACLENTRIES(HTTP_PUT),
REMOVEDEFAULTACL(HTTP_PUT), REMOVEACL(HTTP_PUT), SETACL(HTTP_PUT),
DELETE(HTTP_DELETE), SETXATTR(HTTP_PUT), GETXATTRS(HTTP_GET),
- REMOVEXATTR(HTTP_PUT), LISTXATTRS(HTTP_GET);
+ REMOVEXATTR(HTTP_PUT), LISTXATTRS(HTTP_GET), LISTSTATUS_BATCH(HTTP_GET);
private String httpMethod;
@@ -668,6 +675,17 @@ public class HttpFSFileSystem extends FileSystem
return (Boolean) json.get(DELETE_JSON);
}
+ private FileStatus[] toFileStatuses(JSONObject json, Path f) {
+ json = (JSONObject) json.get(FILE_STATUSES_JSON);
+ JSONArray jsonArray = (JSONArray) json.get(FILE_STATUS_JSON);
+ FileStatus[] array = new FileStatus[jsonArray.size()];
+ f = makeQualified(f);
+ for (int i = 0; i < jsonArray.size(); i++) {
+ array[i] = createFileStatus(f, (JSONObject) jsonArray.get(i));
+ }
+ return array;
+ }
+
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
@@ -686,14 +704,36 @@ public class HttpFSFileSystem extends FileSystem
params, f, true);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
- json = (JSONObject) json.get(FILE_STATUSES_JSON);
- JSONArray jsonArray = (JSONArray) json.get(FILE_STATUS_JSON);
- FileStatus[] array = new FileStatus[jsonArray.size()];
- f = makeQualified(f);
- for (int i = 0; i < jsonArray.size(); i++) {
- array[i] = createFileStatus(f, (JSONObject) jsonArray.get(i));
+ return toFileStatuses(json, f);
+ }
+
+ @Override
+ public DirectoryEntries listStatusBatch(Path f, byte[] token) throws
+ FileNotFoundException, IOException {
+ Map params = new HashMap();
+ params.put(OP_PARAM, Operation.LISTSTATUS_BATCH.toString());
+ if (token != null) {
+ params.put(START_AFTER_PARAM, new String(token, Charsets.UTF_8));
}
- return array;
+ HttpURLConnection conn = getConnection(
+ Operation.LISTSTATUS_BATCH.getMethod(),
+ params, f, true);
+ HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
+ // Parse the FileStatus array
+ JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
+ JSONObject listing = (JSONObject) json.get(DIRECTORY_LISTING_JSON);
+ FileStatus[] statuses = toFileStatuses(
+ (JSONObject) listing.get(PARTIAL_LISTING_JSON), f);
+ // New token is the last FileStatus entry
+ byte[] newToken = null;
+ if (statuses.length > 0) {
+ newToken = statuses[statuses.length - 1].getPath().getName().toString()
+ .getBytes(Charsets.UTF_8);
+ }
+ // Parse the remainingEntries boolean into hasMore
+ final long remainingEntries = (Long) listing.get(REMAINING_ENTRIES_JSON);
+ final boolean hasMore = remainingEntries > 0 ? true : false;
+ return new DirectoryEntries(statuses, newToken, hasMore);
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSUtils.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSUtils.java
index 95e26d799f1..fcc7bab15e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/HttpFSUtils.java
@@ -43,6 +43,8 @@ public class HttpFSUtils {
public static final String SERVICE_VERSION = "/v1";
+ public static final byte[] EMPTY_BYTES = {};
+
private static final String SERVICE_PATH = SERVICE_NAME + SERVICE_VERSION;
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java
index 39597ebb120..46948f96b15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/FSOperations.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.GlobFilter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
@@ -37,6 +38,7 @@ import org.apache.hadoop.util.StringUtils;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -108,6 +110,27 @@ public class FSOperations {
return json;
}
+ /**
+ * Serializes a DirectoryEntries object into the JSON for a
+ * WebHDFS {@link org.apache.hadoop.hdfs.protocol.DirectoryListing}.
+ *
+ * These two classes are slightly different, due to the impedance
+ * mismatches between the WebHDFS and FileSystem APIs.
+ * @param entries
+ * @return json
+ */
+ private static Map toJson(FileSystem.DirectoryEntries
+ entries) {
+ Map json = new LinkedHashMap<>();
+ Map inner = new LinkedHashMap<>();
+ Map fileStatuses = toJson(entries.getEntries());
+ inner.put(HttpFSFileSystem.PARTIAL_LISTING_JSON, fileStatuses);
+ inner.put(HttpFSFileSystem.REMAINING_ENTRIES_JSON, entries.hasMore() ? 1
+ : 0);
+ json.put(HttpFSFileSystem.DIRECTORY_LISTING_JSON, inner);
+ return json;
+ }
+
/** Converts an AclStatus
object into a JSON object.
*
* @param aclStatus AclStatus object
@@ -624,6 +647,45 @@ public class FSOperations {
}
+ /**
+ * Executor that performs a batched directory listing.
+ */
+ @InterfaceAudience.Private
+ public static class FSListStatusBatch implements FileSystemAccess
+ .FileSystemExecutor