HDFS-10823. Implement HttpFSFileSystem#listStatusIterator.

(cherry picked from commit 8a40953058)
This commit is contained in:
Andrew Wang 2016-09-16 15:37:36 -07:00
parent 031d5f6c5b
commit b03a0be7a3
11 changed files with 341 additions and 79 deletions

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.core.Tracer;
import org.apache.htrace.core.TraceScope;
import com.google.common.base.Preconditions;
import com.google.common.annotations.VisibleForTesting;
import static com.google.common.base.Preconditions.checkArgument;
@ -1529,7 +1530,68 @@ public abstract class FileSystem extends Configured implements Closeable {
*/
public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException;
/**
* Represents a batch of directory entries when iteratively listing a
* directory. This is a private API not meant for use by end users.
* <p>
* For internal use by FileSystem subclasses that override
* {@link FileSystem#listStatusBatch(Path, byte[])} to implement iterative
* listing.
*/
@InterfaceAudience.Private
public static class DirectoryEntries {
private final FileStatus[] entries;
private final byte[] token;
private final boolean hasMore;
public DirectoryEntries(FileStatus[] entries, byte[] token, boolean
hasMore) {
this.entries = entries;
if (token != null) {
this.token = token.clone();
} else {
this.token = null;
}
this.hasMore = hasMore;
}
public FileStatus[] getEntries() {
return entries;
}
public byte[] getToken() {
return token;
}
public boolean hasMore() {
return hasMore;
}
}
/**
* Given an opaque iteration token, return the next batch of entries in a
* directory. This is a private API not meant for use by end users.
* <p>
* This method should be overridden by FileSystem subclasses that want to
* use the generic {@link FileSystem#listStatusIterator(Path)} implementation.
* @param f Path to list
* @param token opaque iteration token returned by previous call, or null
* if this is the first call.
* @return
* @throws FileNotFoundException
* @throws IOException
*/
@InterfaceAudience.Private
protected DirectoryEntries listStatusBatch(Path f, byte[] token) throws
FileNotFoundException, IOException {
// The default implementation returns the entire listing as a single batch.
// Thus, there is never a second batch, and no need to respect the passed
// token or set a token in the returned DirectoryEntries.
FileStatus[] listing = listStatus(f);
return new DirectoryEntries(listing, null, false);
}
/*
* Filter files/directories in the given path using the user-supplied path
* filter. Results are added to the given array <code>results</code>.
@ -1766,6 +1828,49 @@ public abstract class FileSystem extends Configured implements Closeable {
};
}
/**
* Generic iterator for implementing {@link #listStatusIterator(Path)}.
*/
private class DirListingIterator<T extends FileStatus> implements
RemoteIterator<T> {
private final Path path;
private DirectoryEntries entries;
private int i = 0;
DirListingIterator(Path path) {
this.path = path;
}
@Override
public boolean hasNext() throws IOException {
if (entries == null) {
fetchMore();
}
return i < entries.getEntries().length ||
entries.hasMore();
}
private void fetchMore() throws IOException {
byte[] token = null;
if (entries != null) {
token = entries.getToken();
}
entries = listStatusBatch(path, token);
i = 0;
}
@Override
@SuppressWarnings("unchecked")
public T next() throws IOException {
Preconditions.checkState(hasNext(), "No more items in iterator");
if (i == entries.getEntries().length) {
fetchMore();
}
return (T)entries.getEntries()[i++];
}
}
/**
* Returns a remote iterator so that followup calls are made on demand
* while consuming the entries. Each file system implementation should
@ -1779,23 +1884,7 @@ public abstract class FileSystem extends Configured implements Closeable {
*/
public RemoteIterator<FileStatus> listStatusIterator(final Path p)
throws FileNotFoundException, IOException {
return new RemoteIterator<FileStatus>() {
private final FileStatus[] stats = listStatus(p);
private int i = 0;
@Override
public boolean hasNext() {
return i<stats.length;
}
@Override
public FileStatus next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException("No more entry in " + p);
}
return stats[i++];
}
};
return new DirListingIterator<>(p);
}
/**

View File

@ -103,6 +103,7 @@ public class TestFilterFileSystem {
public void processDeleteOnExit();
public FsStatus getStatus();
public FileStatus[] listStatus(Path f, PathFilter filter);
public FileStatus[] listStatusBatch(Path f, byte[] token);
public FileStatus[] listStatus(Path[] files);
public FileStatus[] listStatus(Path[] files, PathFilter filter);
public FileStatus[] globStatus(Path pathPattern);

View File

@ -115,6 +115,7 @@ public class TestHarFileSystem {
public QuotaUsage getQuotaUsage(Path f);
public FsStatus getStatus();
public FileStatus[] listStatus(Path f, PathFilter filter);
public FileStatus[] listStatusBatch(Path f, byte[] token);
public FileStatus[] listStatus(Path[] files);
public FileStatus[] listStatus(Path[] files, PathFilter filter);
public FileStatus[] globStatus(Path pathPattern);

View File

@ -64,7 +64,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
@ -1491,55 +1490,30 @@ public class WebHdfsFileSystem extends FileSystem
}
private static final byte[] EMPTY_ARRAY = new byte[] {};
private class DirListingIterator<T extends FileStatus> implements
RemoteIterator<T> {
private final Path path;
private DirectoryListing thisListing;
private int i = 0;
private byte[] prevKey = EMPTY_ARRAY;
DirListingIterator(Path path) {
this.path = path;
}
@Override
public boolean hasNext() throws IOException {
if (thisListing == null) {
fetchMore();
}
return i < thisListing.getPartialListing().length ||
thisListing.hasMore();
}
private void fetchMore() throws IOException {
thisListing = new FsPathResponseRunner<DirectoryListing>(
GetOpParam.Op.LISTSTATUS_BATCH,
path, new StartAfterParam(new String(prevKey, Charsets.UTF_8))) {
@Override
DirectoryListing decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtilClient.toDirectoryListing(json);
}
}.run();
i = 0;
prevKey = thisListing.getLastName();
}
@Override
@SuppressWarnings("unchecked")
public T next() throws IOException {
Preconditions.checkState(hasNext(), "No more items in iterator");
if (i == thisListing.getPartialListing().length) {
fetchMore();
}
return (T)makeQualified(thisListing.getPartialListing()[i++], path);
}
}
@Override
public RemoteIterator<FileStatus> listStatusIterator(final Path f)
throws FileNotFoundException, IOException {
return new DirListingIterator<>(f);
public DirectoryEntries listStatusBatch(Path f, byte[] token) throws
FileNotFoundException, IOException {
byte[] prevKey = EMPTY_ARRAY;
if (token != null) {
prevKey = token;
}
DirectoryListing listing = new FsPathResponseRunner<DirectoryListing>(
GetOpParam.Op.LISTSTATUS_BATCH,
f, new StartAfterParam(new String(prevKey, Charsets.UTF_8))) {
@Override
DirectoryListing decodeResponse(Map<?, ?> json) throws IOException {
return JsonUtilClient.toDirectoryListing(json);
}
}.run();
// Qualify the returned FileStatus array
final HdfsFileStatus[] statuses = listing.getPartialListing();
FileStatus[] qualified = new FileStatus[statuses.length];
for (int i = 0; i < statuses.length; i++) {
qualified[i] = makeQualified(statuses[i], f);
}
return new DirectoryEntries(qualified, listing.getLastName(),
listing.hasMore());
}
@Override

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.fs.http.client;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import com.google.common.base.Charsets;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
@ -112,6 +114,7 @@ public class HttpFSFileSystem extends FileSystem
public static final String XATTR_SET_FLAG_PARAM = "flag";
public static final String XATTR_ENCODING_PARAM = "encoding";
public static final String NEW_LENGTH_PARAM = "newlength";
public static final String START_AFTER_PARAM = "startAfter";
public static final Short DEFAULT_PERMISSION = 0755;
public static final String ACLSPEC_DEFAULT = "";
@ -185,6 +188,10 @@ public class HttpFSFileSystem extends FileSystem
public static final String ENC_BIT_JSON = "encBit";
public static final String DIRECTORY_LISTING_JSON = "DirectoryListing";
public static final String PARTIAL_LISTING_JSON = "partialListing";
public static final String REMAINING_ENTRIES_JSON = "remainingEntries";
public static final int HTTP_TEMPORARY_REDIRECT = 307;
private static final String HTTP_GET = "GET";
@ -204,7 +211,7 @@ public class HttpFSFileSystem extends FileSystem
MODIFYACLENTRIES(HTTP_PUT), REMOVEACLENTRIES(HTTP_PUT),
REMOVEDEFAULTACL(HTTP_PUT), REMOVEACL(HTTP_PUT), SETACL(HTTP_PUT),
DELETE(HTTP_DELETE), SETXATTR(HTTP_PUT), GETXATTRS(HTTP_GET),
REMOVEXATTR(HTTP_PUT), LISTXATTRS(HTTP_GET);
REMOVEXATTR(HTTP_PUT), LISTXATTRS(HTTP_GET), LISTSTATUS_BATCH(HTTP_GET);
private String httpMethod;
@ -668,6 +675,17 @@ public class HttpFSFileSystem extends FileSystem
return (Boolean) json.get(DELETE_JSON);
}
private FileStatus[] toFileStatuses(JSONObject json, Path f) {
json = (JSONObject) json.get(FILE_STATUSES_JSON);
JSONArray jsonArray = (JSONArray) json.get(FILE_STATUS_JSON);
FileStatus[] array = new FileStatus[jsonArray.size()];
f = makeQualified(f);
for (int i = 0; i < jsonArray.size(); i++) {
array[i] = createFileStatus(f, (JSONObject) jsonArray.get(i));
}
return array;
}
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
@ -686,14 +704,36 @@ public class HttpFSFileSystem extends FileSystem
params, f, true);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
json = (JSONObject) json.get(FILE_STATUSES_JSON);
JSONArray jsonArray = (JSONArray) json.get(FILE_STATUS_JSON);
FileStatus[] array = new FileStatus[jsonArray.size()];
f = makeQualified(f);
for (int i = 0; i < jsonArray.size(); i++) {
array[i] = createFileStatus(f, (JSONObject) jsonArray.get(i));
return toFileStatuses(json, f);
}
@Override
public DirectoryEntries listStatusBatch(Path f, byte[] token) throws
FileNotFoundException, IOException {
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, Operation.LISTSTATUS_BATCH.toString());
if (token != null) {
params.put(START_AFTER_PARAM, new String(token, Charsets.UTF_8));
}
return array;
HttpURLConnection conn = getConnection(
Operation.LISTSTATUS_BATCH.getMethod(),
params, f, true);
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
// Parse the FileStatus array
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
JSONObject listing = (JSONObject) json.get(DIRECTORY_LISTING_JSON);
FileStatus[] statuses = toFileStatuses(
(JSONObject) listing.get(PARTIAL_LISTING_JSON), f);
// New token is the last FileStatus entry
byte[] newToken = null;
if (statuses.length > 0) {
newToken = statuses[statuses.length - 1].getPath().getName().toString()
.getBytes(Charsets.UTF_8);
}
// Parse the remainingEntries boolean into hasMore
final long remainingEntries = (Long) listing.get(REMAINING_ENTRIES_JSON);
final boolean hasMore = remainingEntries > 0 ? true : false;
return new DirectoryEntries(statuses, newToken, hasMore);
}
/**

View File

@ -43,6 +43,8 @@ public class HttpFSUtils {
public static final String SERVICE_VERSION = "/v1";
public static final byte[] EMPTY_BYTES = {};
private static final String SERVICE_PATH = SERVICE_NAME + SERVICE_VERSION;
/**

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.GlobFilter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
@ -37,6 +38,7 @@ import org.apache.hadoop.util.StringUtils;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -108,6 +110,27 @@ public class FSOperations {
return json;
}
/**
* Serializes a DirectoryEntries object into the JSON for a
* WebHDFS {@link org.apache.hadoop.hdfs.protocol.DirectoryListing}.
* <p>
* These two classes are slightly different, due to the impedance
* mismatches between the WebHDFS and FileSystem APIs.
* @param entries
* @return json
*/
private static Map<String, Object> toJson(FileSystem.DirectoryEntries
entries) {
Map<String, Object> json = new LinkedHashMap<>();
Map<String, Object> inner = new LinkedHashMap<>();
Map<String, Object> fileStatuses = toJson(entries.getEntries());
inner.put(HttpFSFileSystem.PARTIAL_LISTING_JSON, fileStatuses);
inner.put(HttpFSFileSystem.REMAINING_ENTRIES_JSON, entries.hasMore() ? 1
: 0);
json.put(HttpFSFileSystem.DIRECTORY_LISTING_JSON, inner);
return json;
}
/** Converts an <code>AclStatus</code> object into a JSON object.
*
* @param aclStatus AclStatus object
@ -624,6 +647,45 @@ public class FSOperations {
}
/**
* Executor that performs a batched directory listing.
*/
@InterfaceAudience.Private
public static class FSListStatusBatch implements FileSystemAccess
.FileSystemExecutor<Map> {
private final Path path;
private final byte[] token;
public FSListStatusBatch(String path, byte[] token) throws IOException {
this.path = new Path(path);
this.token = token.clone();
}
/**
* Simple wrapper filesystem that exposes the protected batched
* listStatus API so we can use it.
*/
private static class WrappedFileSystem extends FilterFileSystem {
public WrappedFileSystem(FileSystem f) {
super(f);
}
@Override
public DirectoryEntries listStatusBatch(Path f, byte[] token) throws
FileNotFoundException, IOException {
return super.listStatusBatch(f, token);
}
}
@Override
public Map execute(FileSystem fs) throws IOException {
WrappedFileSystem wrappedFS = new WrappedFileSystem(fs);
FileSystem.DirectoryEntries entries =
wrappedFS.listStatusBatch(path, token);
return toJson(entries);
}
}
/**
* Executor that performs a mkdirs FileSystemAccess files system operation.
*/

View File

@ -91,6 +91,8 @@ public class HttpFSParametersProvider extends ParametersProvider {
PARAMS_DEF.put(Operation.GETXATTRS,
new Class[]{XAttrNameParam.class, XAttrEncodingParam.class});
PARAMS_DEF.put(Operation.LISTXATTRS, new Class[]{});
PARAMS_DEF.put(Operation.LISTSTATUS_BATCH,
new Class[]{StartAfterParam.class});
}
public HttpFSParametersProvider() {
@ -520,4 +522,22 @@ public class HttpFSParametersProvider extends ParametersProvider {
super(NAME, XAttrCodec.class, null);
}
}
/**
* Class for startafter parameter.
*/
@InterfaceAudience.Private
public static class StartAfterParam extends StringParam {
/**
* Parameter name.
*/
public static final String NAME = HttpFSFileSystem.START_AFTER_PARAM;
/**
* Constructor.
*/
public StartAfterParam() {
super(NAME, null);
}
}
}

View File

@ -18,12 +18,14 @@
package org.apache.hadoop.fs.http.server;
import com.google.common.base.Charsets;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.XAttrCodec;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
import org.apache.hadoop.fs.http.client.HttpFSUtils;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.AccessTimeParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.AclPermissionParam;
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.BlockSizeParam;
@ -320,6 +322,21 @@ public class HttpFSServer {
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case LISTSTATUS_BATCH: {
String startAfter = params.get(
HttpFSParametersProvider.StartAfterParam.NAME,
HttpFSParametersProvider.StartAfterParam.class);
byte[] token = HttpFSUtils.EMPTY_BYTES;
if (startAfter != null) {
token = startAfter.getBytes(Charsets.UTF_8);
}
FSOperations.FSListStatusBatch command = new FSOperations
.FSListStatusBatch(path, token);
@SuppressWarnings("rawtypes") Map json = fsExecute(user, command);
AUDIT_LOG.info("[{}] token [{}]", path, token);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
default: {
throw new IOException(
MessageFormat.format("Invalid HTTP GET operation [{0}]", op.value()));

View File

@ -84,7 +84,7 @@ public class FileSystemAccessService extends BaseService implements FileSystemAc
count = 0;
}
synchronized FileSystem getFileSytem(Configuration conf)
synchronized FileSystem getFileSystem(Configuration conf)
throws IOException {
if (fs == null) {
fs = FileSystem.get(conf);
@ -290,7 +290,7 @@ public class FileSystemAccessService extends BaseService implements FileSystemAc
}
Configuration conf = new Configuration(namenodeConf);
conf.set(HTTPFS_FS_USER, user);
return cachedFS.getFileSytem(conf);
return cachedFS.getFileSystem(conf);
}
protected void closeFileSystem(FileSystem fs) throws IOException {

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.http.server.HttpFSServerWebApp;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
@ -44,6 +45,7 @@ import org.apache.hadoop.test.TestHdfsHelper;
import org.apache.hadoop.test.TestJetty;
import org.apache.hadoop.test.TestJettyHelper;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -62,6 +64,7 @@ import java.io.Writer;
import java.net.URI;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@ -136,14 +139,19 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
return "webhdfs";
}
protected FileSystem getHttpFSFileSystem() throws Exception {
Configuration conf = new Configuration();
protected FileSystem getHttpFSFileSystem(Configuration conf) throws
Exception {
conf.set("fs.webhdfs.impl", getFileSystemClass().getName());
URI uri = new URI(getScheme() + "://" +
TestJettyHelper.getJettyURL().toURI().getAuthority());
return FileSystem.get(uri, conf);
}
protected FileSystem getHttpFSFileSystem() throws Exception {
Configuration conf = new Configuration();
return getHttpFSFileSystem(conf);
}
protected void testGet() throws Exception {
FileSystem fs = getHttpFSFileSystem();
Assert.assertNotNull(fs);
@ -355,6 +363,51 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
assertEquals(stati[0].getPath().getName(), path.getName());
}
private static void assertSameListing(FileSystem expected, FileSystem
actual, Path p) throws IOException {
// Consume all the entries from both iterators
RemoteIterator<FileStatus> exIt = expected.listStatusIterator(p);
List<FileStatus> exStatuses = new ArrayList<>();
while (exIt.hasNext()) {
exStatuses.add(exIt.next());
}
RemoteIterator<FileStatus> acIt = actual.listStatusIterator(p);
List<FileStatus> acStatuses = new ArrayList<>();
while (acIt.hasNext()) {
acStatuses.add(acIt.next());
}
assertEquals(exStatuses.size(), acStatuses.size());
for (int i = 0; i < exStatuses.size(); i++) {
FileStatus expectedStatus = exStatuses.get(i);
FileStatus actualStatus = acStatuses.get(i);
// Path URIs are fully qualified, so compare just the path component
assertEquals(expectedStatus.getPath().toUri().getPath(),
actualStatus.getPath().toUri().getPath());
}
}
private void testListStatusBatch() throws Exception {
// LocalFileSystem writes checksum files next to the data files, which
// show up when listing via LFS. This makes the listings not compare
// properly.
Assume.assumeFalse(isLocalFS());
FileSystem proxyFs = FileSystem.get(getProxiedFSConf());
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 2);
FileSystem httpFs = getHttpFSFileSystem(conf);
// Test an empty directory
Path dir = new Path(getProxiedFSTestDir(), "dir");
proxyFs.mkdirs(dir);
assertSameListing(proxyFs, httpFs, dir);
// Create and test in a loop
for (int i = 0; i < 10; i++) {
proxyFs.create(new Path(dir, "file" + i)).close();
assertSameListing(proxyFs, httpFs, dir);
}
}
private void testWorkingdirectory() throws Exception {
FileSystem fs = FileSystem.get(getProxiedFSConf());
Path workingDir = fs.getWorkingDirectory();
@ -863,7 +916,7 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
GET, OPEN, CREATE, APPEND, TRUNCATE, CONCAT, RENAME, DELETE, LIST_STATUS,
WORKING_DIRECTORY, MKDIRS, SET_TIMES, SET_PERMISSION, SET_OWNER,
SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY, FILEACLS, DIRACLS, SET_XATTR,
GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION
GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION, LIST_STATUS_BATCH
}
private void operation(Operation op) throws Exception {
@ -940,6 +993,9 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
case ENCRYPTION:
testEncryption();
break;
case LIST_STATUS_BATCH:
testListStatusBatch();
break;
}
}