HDFS-10823. Implement HttpFSFileSystem#listStatusIterator.
This commit is contained in:
parent
f6f3a447bf
commit
8a40953058
|
@ -73,6 +73,7 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.htrace.core.Tracer;
|
import org.apache.htrace.core.Tracer;
|
||||||
import org.apache.htrace.core.TraceScope;
|
import org.apache.htrace.core.TraceScope;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkArgument;
|
import static com.google.common.base.Preconditions.checkArgument;
|
||||||
|
@ -1531,6 +1532,67 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException,
|
public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException,
|
||||||
IOException;
|
IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a batch of directory entries when iteratively listing a
|
||||||
|
* directory. This is a private API not meant for use by end users.
|
||||||
|
* <p>
|
||||||
|
* For internal use by FileSystem subclasses that override
|
||||||
|
* {@link FileSystem#listStatusBatch(Path, byte[])} to implement iterative
|
||||||
|
* listing.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public static class DirectoryEntries {
|
||||||
|
private final FileStatus[] entries;
|
||||||
|
private final byte[] token;
|
||||||
|
private final boolean hasMore;
|
||||||
|
|
||||||
|
public DirectoryEntries(FileStatus[] entries, byte[] token, boolean
|
||||||
|
hasMore) {
|
||||||
|
this.entries = entries;
|
||||||
|
if (token != null) {
|
||||||
|
this.token = token.clone();
|
||||||
|
} else {
|
||||||
|
this.token = null;
|
||||||
|
}
|
||||||
|
this.hasMore = hasMore;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FileStatus[] getEntries() {
|
||||||
|
return entries;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getToken() {
|
||||||
|
return token;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean hasMore() {
|
||||||
|
return hasMore;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given an opaque iteration token, return the next batch of entries in a
|
||||||
|
* directory. This is a private API not meant for use by end users.
|
||||||
|
* <p>
|
||||||
|
* This method should be overridden by FileSystem subclasses that want to
|
||||||
|
* use the generic {@link FileSystem#listStatusIterator(Path)} implementation.
|
||||||
|
* @param f Path to list
|
||||||
|
* @param token opaque iteration token returned by previous call, or null
|
||||||
|
* if this is the first call.
|
||||||
|
* @return
|
||||||
|
* @throws FileNotFoundException
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
protected DirectoryEntries listStatusBatch(Path f, byte[] token) throws
|
||||||
|
FileNotFoundException, IOException {
|
||||||
|
// The default implementation returns the entire listing as a single batch.
|
||||||
|
// Thus, there is never a second batch, and no need to respect the passed
|
||||||
|
// token or set a token in the returned DirectoryEntries.
|
||||||
|
FileStatus[] listing = listStatus(f);
|
||||||
|
return new DirectoryEntries(listing, null, false);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Filter files/directories in the given path using the user-supplied path
|
* Filter files/directories in the given path using the user-supplied path
|
||||||
* filter. Results are added to the given array <code>results</code>.
|
* filter. Results are added to the given array <code>results</code>.
|
||||||
|
@ -1766,6 +1828,49 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generic iterator for implementing {@link #listStatusIterator(Path)}.
|
||||||
|
*/
|
||||||
|
private class DirListingIterator<T extends FileStatus> implements
|
||||||
|
RemoteIterator<T> {
|
||||||
|
|
||||||
|
private final Path path;
|
||||||
|
private DirectoryEntries entries;
|
||||||
|
private int i = 0;
|
||||||
|
|
||||||
|
DirListingIterator(Path path) {
|
||||||
|
this.path = path;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() throws IOException {
|
||||||
|
if (entries == null) {
|
||||||
|
fetchMore();
|
||||||
|
}
|
||||||
|
return i < entries.getEntries().length ||
|
||||||
|
entries.hasMore();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fetchMore() throws IOException {
|
||||||
|
byte[] token = null;
|
||||||
|
if (entries != null) {
|
||||||
|
token = entries.getToken();
|
||||||
|
}
|
||||||
|
entries = listStatusBatch(path, token);
|
||||||
|
i = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public T next() throws IOException {
|
||||||
|
Preconditions.checkState(hasNext(), "No more items in iterator");
|
||||||
|
if (i == entries.getEntries().length) {
|
||||||
|
fetchMore();
|
||||||
|
}
|
||||||
|
return (T)entries.getEntries()[i++];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a remote iterator so that followup calls are made on demand
|
* Returns a remote iterator so that followup calls are made on demand
|
||||||
* while consuming the entries. Each file system implementation should
|
* while consuming the entries. Each file system implementation should
|
||||||
|
@ -1779,23 +1884,7 @@ public abstract class FileSystem extends Configured implements Closeable {
|
||||||
*/
|
*/
|
||||||
public RemoteIterator<FileStatus> listStatusIterator(final Path p)
|
public RemoteIterator<FileStatus> listStatusIterator(final Path p)
|
||||||
throws FileNotFoundException, IOException {
|
throws FileNotFoundException, IOException {
|
||||||
return new RemoteIterator<FileStatus>() {
|
return new DirListingIterator<>(p);
|
||||||
private final FileStatus[] stats = listStatus(p);
|
|
||||||
private int i = 0;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean hasNext() {
|
|
||||||
return i<stats.length;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public FileStatus next() throws IOException {
|
|
||||||
if (!hasNext()) {
|
|
||||||
throw new NoSuchElementException("No more entry in " + p);
|
|
||||||
}
|
|
||||||
return stats[i++];
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -103,6 +103,7 @@ public class TestFilterFileSystem {
|
||||||
public void processDeleteOnExit();
|
public void processDeleteOnExit();
|
||||||
public FsStatus getStatus();
|
public FsStatus getStatus();
|
||||||
public FileStatus[] listStatus(Path f, PathFilter filter);
|
public FileStatus[] listStatus(Path f, PathFilter filter);
|
||||||
|
public FileStatus[] listStatusBatch(Path f, byte[] token);
|
||||||
public FileStatus[] listStatus(Path[] files);
|
public FileStatus[] listStatus(Path[] files);
|
||||||
public FileStatus[] listStatus(Path[] files, PathFilter filter);
|
public FileStatus[] listStatus(Path[] files, PathFilter filter);
|
||||||
public FileStatus[] globStatus(Path pathPattern);
|
public FileStatus[] globStatus(Path pathPattern);
|
||||||
|
|
|
@ -115,6 +115,7 @@ public class TestHarFileSystem {
|
||||||
public QuotaUsage getQuotaUsage(Path f);
|
public QuotaUsage getQuotaUsage(Path f);
|
||||||
public FsStatus getStatus();
|
public FsStatus getStatus();
|
||||||
public FileStatus[] listStatus(Path f, PathFilter filter);
|
public FileStatus[] listStatus(Path f, PathFilter filter);
|
||||||
|
public FileStatus[] listStatusBatch(Path f, byte[] token);
|
||||||
public FileStatus[] listStatus(Path[] files);
|
public FileStatus[] listStatus(Path[] files);
|
||||||
public FileStatus[] listStatus(Path[] files, PathFilter filter);
|
public FileStatus[] listStatus(Path[] files, PathFilter filter);
|
||||||
public FileStatus[] globStatus(Path pathPattern);
|
public FileStatus[] globStatus(Path pathPattern);
|
||||||
|
|
|
@ -64,7 +64,6 @@ import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.GlobalStorageStatistics;
|
import org.apache.hadoop.fs.GlobalStorageStatistics;
|
||||||
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
|
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
|
||||||
import org.apache.hadoop.fs.StorageStatistics;
|
import org.apache.hadoop.fs.StorageStatistics;
|
||||||
import org.apache.hadoop.fs.permission.FsCreateModes;
|
import org.apache.hadoop.fs.permission.FsCreateModes;
|
||||||
import org.apache.hadoop.hdfs.DFSOpsCountStatistics;
|
import org.apache.hadoop.hdfs.DFSOpsCountStatistics;
|
||||||
|
@ -1504,55 +1503,30 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final byte[] EMPTY_ARRAY = new byte[] {};
|
private static final byte[] EMPTY_ARRAY = new byte[] {};
|
||||||
private class DirListingIterator<T extends FileStatus> implements
|
|
||||||
RemoteIterator<T> {
|
|
||||||
|
|
||||||
private final Path path;
|
|
||||||
private DirectoryListing thisListing;
|
|
||||||
private int i = 0;
|
|
||||||
private byte[] prevKey = EMPTY_ARRAY;
|
|
||||||
|
|
||||||
DirListingIterator(Path path) {
|
|
||||||
this.path = path;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean hasNext() throws IOException {
|
public DirectoryEntries listStatusBatch(Path f, byte[] token) throws
|
||||||
if (thisListing == null) {
|
FileNotFoundException, IOException {
|
||||||
fetchMore();
|
byte[] prevKey = EMPTY_ARRAY;
|
||||||
|
if (token != null) {
|
||||||
|
prevKey = token;
|
||||||
}
|
}
|
||||||
return i < thisListing.getPartialListing().length ||
|
DirectoryListing listing = new FsPathResponseRunner<DirectoryListing>(
|
||||||
thisListing.hasMore();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void fetchMore() throws IOException {
|
|
||||||
thisListing = new FsPathResponseRunner<DirectoryListing>(
|
|
||||||
GetOpParam.Op.LISTSTATUS_BATCH,
|
GetOpParam.Op.LISTSTATUS_BATCH,
|
||||||
path, new StartAfterParam(new String(prevKey, Charsets.UTF_8))) {
|
f, new StartAfterParam(new String(prevKey, Charsets.UTF_8))) {
|
||||||
@Override
|
@Override
|
||||||
DirectoryListing decodeResponse(Map<?, ?> json) throws IOException {
|
DirectoryListing decodeResponse(Map<?, ?> json) throws IOException {
|
||||||
return JsonUtilClient.toDirectoryListing(json);
|
return JsonUtilClient.toDirectoryListing(json);
|
||||||
}
|
}
|
||||||
}.run();
|
}.run();
|
||||||
i = 0;
|
// Qualify the returned FileStatus array
|
||||||
prevKey = thisListing.getLastName();
|
final HdfsFileStatus[] statuses = listing.getPartialListing();
|
||||||
|
FileStatus[] qualified = new FileStatus[statuses.length];
|
||||||
|
for (int i = 0; i < statuses.length; i++) {
|
||||||
|
qualified[i] = makeQualified(statuses[i], f);
|
||||||
}
|
}
|
||||||
|
return new DirectoryEntries(qualified, listing.getLastName(),
|
||||||
@Override
|
listing.hasMore());
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
public T next() throws IOException {
|
|
||||||
Preconditions.checkState(hasNext(), "No more items in iterator");
|
|
||||||
if (i == thisListing.getPartialListing().length) {
|
|
||||||
fetchMore();
|
|
||||||
}
|
|
||||||
return (T)makeQualified(thisListing.getPartialListing()[i++], path);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public RemoteIterator<FileStatus> listStatusIterator(final Path f)
|
|
||||||
throws FileNotFoundException, IOException {
|
|
||||||
return new DirListingIterator<>(f);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.fs.http.client;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import com.google.common.base.Charsets;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.ContentSummary;
|
import org.apache.hadoop.fs.ContentSummary;
|
||||||
|
@ -111,6 +113,7 @@ public class HttpFSFileSystem extends FileSystem
|
||||||
public static final String XATTR_SET_FLAG_PARAM = "flag";
|
public static final String XATTR_SET_FLAG_PARAM = "flag";
|
||||||
public static final String XATTR_ENCODING_PARAM = "encoding";
|
public static final String XATTR_ENCODING_PARAM = "encoding";
|
||||||
public static final String NEW_LENGTH_PARAM = "newlength";
|
public static final String NEW_LENGTH_PARAM = "newlength";
|
||||||
|
public static final String START_AFTER_PARAM = "startAfter";
|
||||||
|
|
||||||
public static final Short DEFAULT_PERMISSION = 0755;
|
public static final Short DEFAULT_PERMISSION = 0755;
|
||||||
public static final String ACLSPEC_DEFAULT = "";
|
public static final String ACLSPEC_DEFAULT = "";
|
||||||
|
@ -184,6 +187,10 @@ public class HttpFSFileSystem extends FileSystem
|
||||||
|
|
||||||
public static final String ENC_BIT_JSON = "encBit";
|
public static final String ENC_BIT_JSON = "encBit";
|
||||||
|
|
||||||
|
public static final String DIRECTORY_LISTING_JSON = "DirectoryListing";
|
||||||
|
public static final String PARTIAL_LISTING_JSON = "partialListing";
|
||||||
|
public static final String REMAINING_ENTRIES_JSON = "remainingEntries";
|
||||||
|
|
||||||
public static final int HTTP_TEMPORARY_REDIRECT = 307;
|
public static final int HTTP_TEMPORARY_REDIRECT = 307;
|
||||||
|
|
||||||
private static final String HTTP_GET = "GET";
|
private static final String HTTP_GET = "GET";
|
||||||
|
@ -203,7 +210,7 @@ public class HttpFSFileSystem extends FileSystem
|
||||||
MODIFYACLENTRIES(HTTP_PUT), REMOVEACLENTRIES(HTTP_PUT),
|
MODIFYACLENTRIES(HTTP_PUT), REMOVEACLENTRIES(HTTP_PUT),
|
||||||
REMOVEDEFAULTACL(HTTP_PUT), REMOVEACL(HTTP_PUT), SETACL(HTTP_PUT),
|
REMOVEDEFAULTACL(HTTP_PUT), REMOVEACL(HTTP_PUT), SETACL(HTTP_PUT),
|
||||||
DELETE(HTTP_DELETE), SETXATTR(HTTP_PUT), GETXATTRS(HTTP_GET),
|
DELETE(HTTP_DELETE), SETXATTR(HTTP_PUT), GETXATTRS(HTTP_GET),
|
||||||
REMOVEXATTR(HTTP_PUT), LISTXATTRS(HTTP_GET);
|
REMOVEXATTR(HTTP_PUT), LISTXATTRS(HTTP_GET), LISTSTATUS_BATCH(HTTP_GET);
|
||||||
|
|
||||||
private String httpMethod;
|
private String httpMethod;
|
||||||
|
|
||||||
|
@ -666,6 +673,17 @@ public class HttpFSFileSystem extends FileSystem
|
||||||
return (Boolean) json.get(DELETE_JSON);
|
return (Boolean) json.get(DELETE_JSON);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private FileStatus[] toFileStatuses(JSONObject json, Path f) {
|
||||||
|
json = (JSONObject) json.get(FILE_STATUSES_JSON);
|
||||||
|
JSONArray jsonArray = (JSONArray) json.get(FILE_STATUS_JSON);
|
||||||
|
FileStatus[] array = new FileStatus[jsonArray.size()];
|
||||||
|
f = makeQualified(f);
|
||||||
|
for (int i = 0; i < jsonArray.size(); i++) {
|
||||||
|
array[i] = createFileStatus(f, (JSONObject) jsonArray.get(i));
|
||||||
|
}
|
||||||
|
return array;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* List the statuses of the files/directories in the given path if the path is
|
* List the statuses of the files/directories in the given path if the path is
|
||||||
* a directory.
|
* a directory.
|
||||||
|
@ -684,14 +702,36 @@ public class HttpFSFileSystem extends FileSystem
|
||||||
params, f, true);
|
params, f, true);
|
||||||
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
||||||
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
|
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
|
||||||
json = (JSONObject) json.get(FILE_STATUSES_JSON);
|
return toFileStatuses(json, f);
|
||||||
JSONArray jsonArray = (JSONArray) json.get(FILE_STATUS_JSON);
|
|
||||||
FileStatus[] array = new FileStatus[jsonArray.size()];
|
|
||||||
f = makeQualified(f);
|
|
||||||
for (int i = 0; i < jsonArray.size(); i++) {
|
|
||||||
array[i] = createFileStatus(f, (JSONObject) jsonArray.get(i));
|
|
||||||
}
|
}
|
||||||
return array;
|
|
||||||
|
@Override
|
||||||
|
public DirectoryEntries listStatusBatch(Path f, byte[] token) throws
|
||||||
|
FileNotFoundException, IOException {
|
||||||
|
Map<String, String> params = new HashMap<String, String>();
|
||||||
|
params.put(OP_PARAM, Operation.LISTSTATUS_BATCH.toString());
|
||||||
|
if (token != null) {
|
||||||
|
params.put(START_AFTER_PARAM, new String(token, Charsets.UTF_8));
|
||||||
|
}
|
||||||
|
HttpURLConnection conn = getConnection(
|
||||||
|
Operation.LISTSTATUS_BATCH.getMethod(),
|
||||||
|
params, f, true);
|
||||||
|
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
||||||
|
// Parse the FileStatus array
|
||||||
|
JSONObject json = (JSONObject) HttpFSUtils.jsonParse(conn);
|
||||||
|
JSONObject listing = (JSONObject) json.get(DIRECTORY_LISTING_JSON);
|
||||||
|
FileStatus[] statuses = toFileStatuses(
|
||||||
|
(JSONObject) listing.get(PARTIAL_LISTING_JSON), f);
|
||||||
|
// New token is the last FileStatus entry
|
||||||
|
byte[] newToken = null;
|
||||||
|
if (statuses.length > 0) {
|
||||||
|
newToken = statuses[statuses.length - 1].getPath().getName().toString()
|
||||||
|
.getBytes(Charsets.UTF_8);
|
||||||
|
}
|
||||||
|
// Parse the remainingEntries boolean into hasMore
|
||||||
|
final long remainingEntries = (Long) listing.get(REMAINING_ENTRIES_JSON);
|
||||||
|
final boolean hasMore = remainingEntries > 0 ? true : false;
|
||||||
|
return new DirectoryEntries(statuses, newToken, hasMore);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -43,6 +43,8 @@ public class HttpFSUtils {
|
||||||
|
|
||||||
public static final String SERVICE_VERSION = "/v1";
|
public static final String SERVICE_VERSION = "/v1";
|
||||||
|
|
||||||
|
public static final byte[] EMPTY_BYTES = {};
|
||||||
|
|
||||||
private static final String SERVICE_PATH = SERVICE_NAME + SERVICE_VERSION;
|
private static final String SERVICE_PATH = SERVICE_NAME + SERVICE_VERSION;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.hadoop.fs.ContentSummary;
|
||||||
import org.apache.hadoop.fs.FileChecksum;
|
import org.apache.hadoop.fs.FileChecksum;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FilterFileSystem;
|
||||||
import org.apache.hadoop.fs.GlobFilter;
|
import org.apache.hadoop.fs.GlobFilter;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.PathFilter;
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
|
@ -37,6 +38,7 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
import org.json.simple.JSONArray;
|
import org.json.simple.JSONArray;
|
||||||
import org.json.simple.JSONObject;
|
import org.json.simple.JSONObject;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
@ -108,6 +110,27 @@ public class FSOperations {
|
||||||
return json;
|
return json;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Serializes a DirectoryEntries object into the JSON for a
|
||||||
|
* WebHDFS {@link org.apache.hadoop.hdfs.protocol.DirectoryListing}.
|
||||||
|
* <p>
|
||||||
|
* These two classes are slightly different, due to the impedance
|
||||||
|
* mismatches between the WebHDFS and FileSystem APIs.
|
||||||
|
* @param entries
|
||||||
|
* @return json
|
||||||
|
*/
|
||||||
|
private static Map<String, Object> toJson(FileSystem.DirectoryEntries
|
||||||
|
entries) {
|
||||||
|
Map<String, Object> json = new LinkedHashMap<>();
|
||||||
|
Map<String, Object> inner = new LinkedHashMap<>();
|
||||||
|
Map<String, Object> fileStatuses = toJson(entries.getEntries());
|
||||||
|
inner.put(HttpFSFileSystem.PARTIAL_LISTING_JSON, fileStatuses);
|
||||||
|
inner.put(HttpFSFileSystem.REMAINING_ENTRIES_JSON, entries.hasMore() ? 1
|
||||||
|
: 0);
|
||||||
|
json.put(HttpFSFileSystem.DIRECTORY_LISTING_JSON, inner);
|
||||||
|
return json;
|
||||||
|
}
|
||||||
|
|
||||||
/** Converts an <code>AclStatus</code> object into a JSON object.
|
/** Converts an <code>AclStatus</code> object into a JSON object.
|
||||||
*
|
*
|
||||||
* @param aclStatus AclStatus object
|
* @param aclStatus AclStatus object
|
||||||
|
@ -624,6 +647,45 @@ public class FSOperations {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executor that performs a batched directory listing.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public static class FSListStatusBatch implements FileSystemAccess
|
||||||
|
.FileSystemExecutor<Map> {
|
||||||
|
private final Path path;
|
||||||
|
private final byte[] token;
|
||||||
|
|
||||||
|
public FSListStatusBatch(String path, byte[] token) throws IOException {
|
||||||
|
this.path = new Path(path);
|
||||||
|
this.token = token.clone();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple wrapper filesystem that exposes the protected batched
|
||||||
|
* listStatus API so we can use it.
|
||||||
|
*/
|
||||||
|
private static class WrappedFileSystem extends FilterFileSystem {
|
||||||
|
public WrappedFileSystem(FileSystem f) {
|
||||||
|
super(f);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DirectoryEntries listStatusBatch(Path f, byte[] token) throws
|
||||||
|
FileNotFoundException, IOException {
|
||||||
|
return super.listStatusBatch(f, token);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map execute(FileSystem fs) throws IOException {
|
||||||
|
WrappedFileSystem wrappedFS = new WrappedFileSystem(fs);
|
||||||
|
FileSystem.DirectoryEntries entries =
|
||||||
|
wrappedFS.listStatusBatch(path, token);
|
||||||
|
return toJson(entries);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Executor that performs a mkdirs FileSystemAccess files system operation.
|
* Executor that performs a mkdirs FileSystemAccess files system operation.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -91,6 +91,8 @@ public class HttpFSParametersProvider extends ParametersProvider {
|
||||||
PARAMS_DEF.put(Operation.GETXATTRS,
|
PARAMS_DEF.put(Operation.GETXATTRS,
|
||||||
new Class[]{XAttrNameParam.class, XAttrEncodingParam.class});
|
new Class[]{XAttrNameParam.class, XAttrEncodingParam.class});
|
||||||
PARAMS_DEF.put(Operation.LISTXATTRS, new Class[]{});
|
PARAMS_DEF.put(Operation.LISTXATTRS, new Class[]{});
|
||||||
|
PARAMS_DEF.put(Operation.LISTSTATUS_BATCH,
|
||||||
|
new Class[]{StartAfterParam.class});
|
||||||
}
|
}
|
||||||
|
|
||||||
public HttpFSParametersProvider() {
|
public HttpFSParametersProvider() {
|
||||||
|
@ -520,4 +522,22 @@ public class HttpFSParametersProvider extends ParametersProvider {
|
||||||
super(NAME, XAttrCodec.class, null);
|
super(NAME, XAttrCodec.class, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class for startafter parameter.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public static class StartAfterParam extends StringParam {
|
||||||
|
/**
|
||||||
|
* Parameter name.
|
||||||
|
*/
|
||||||
|
public static final String NAME = HttpFSFileSystem.START_AFTER_PARAM;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
*/
|
||||||
|
public StartAfterParam() {
|
||||||
|
super(NAME, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,12 +18,14 @@
|
||||||
|
|
||||||
package org.apache.hadoop.fs.http.server;
|
package org.apache.hadoop.fs.http.server;
|
||||||
|
|
||||||
|
import com.google.common.base.Charsets;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.XAttrCodec;
|
import org.apache.hadoop.fs.XAttrCodec;
|
||||||
import org.apache.hadoop.fs.XAttrSetFlag;
|
import org.apache.hadoop.fs.XAttrSetFlag;
|
||||||
import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
|
import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
|
||||||
|
import org.apache.hadoop.fs.http.client.HttpFSUtils;
|
||||||
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.AccessTimeParam;
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.AccessTimeParam;
|
||||||
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.AclPermissionParam;
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.AclPermissionParam;
|
||||||
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.BlockSizeParam;
|
import org.apache.hadoop.fs.http.server.HttpFSParametersProvider.BlockSizeParam;
|
||||||
|
@ -320,6 +322,21 @@ public class HttpFSServer {
|
||||||
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
|
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case LISTSTATUS_BATCH: {
|
||||||
|
String startAfter = params.get(
|
||||||
|
HttpFSParametersProvider.StartAfterParam.NAME,
|
||||||
|
HttpFSParametersProvider.StartAfterParam.class);
|
||||||
|
byte[] token = HttpFSUtils.EMPTY_BYTES;
|
||||||
|
if (startAfter != null) {
|
||||||
|
token = startAfter.getBytes(Charsets.UTF_8);
|
||||||
|
}
|
||||||
|
FSOperations.FSListStatusBatch command = new FSOperations
|
||||||
|
.FSListStatusBatch(path, token);
|
||||||
|
@SuppressWarnings("rawtypes") Map json = fsExecute(user, command);
|
||||||
|
AUDIT_LOG.info("[{}] token [{}]", path, token);
|
||||||
|
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
|
||||||
|
break;
|
||||||
|
}
|
||||||
default: {
|
default: {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
MessageFormat.format("Invalid HTTP GET operation [{0}]", op.value()));
|
MessageFormat.format("Invalid HTTP GET operation [{0}]", op.value()));
|
||||||
|
|
|
@ -84,7 +84,7 @@ public class FileSystemAccessService extends BaseService implements FileSystemAc
|
||||||
count = 0;
|
count = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized FileSystem getFileSytem(Configuration conf)
|
synchronized FileSystem getFileSystem(Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (fs == null) {
|
if (fs == null) {
|
||||||
fs = FileSystem.get(conf);
|
fs = FileSystem.get(conf);
|
||||||
|
@ -290,7 +290,7 @@ public class FileSystemAccessService extends BaseService implements FileSystemAc
|
||||||
}
|
}
|
||||||
Configuration conf = new Configuration(namenodeConf);
|
Configuration conf = new Configuration(namenodeConf);
|
||||||
conf.set(HTTPFS_FS_USER, user);
|
conf.set(HTTPFS_FS_USER, user);
|
||||||
return cachedFS.getFileSytem(conf);
|
return cachedFS.getFileSystem(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void closeFileSystem(FileSystem fs) throws IOException {
|
protected void closeFileSystem(FileSystem fs) throws IOException {
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileSystemTestHelper;
|
import org.apache.hadoop.fs.FileSystemTestHelper;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.fs.http.server.HttpFSServerWebApp;
|
import org.apache.hadoop.fs.http.server.HttpFSServerWebApp;
|
||||||
import org.apache.hadoop.fs.permission.AclEntry;
|
import org.apache.hadoop.fs.permission.AclEntry;
|
||||||
import org.apache.hadoop.fs.permission.AclStatus;
|
import org.apache.hadoop.fs.permission.AclStatus;
|
||||||
|
@ -44,6 +45,7 @@ import org.apache.hadoop.test.TestHdfsHelper;
|
||||||
import org.apache.hadoop.test.TestJetty;
|
import org.apache.hadoop.test.TestJetty;
|
||||||
import org.apache.hadoop.test.TestJettyHelper;
|
import org.apache.hadoop.test.TestJettyHelper;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.Assume;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
@ -62,6 +64,7 @@ import java.io.Writer;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -136,14 +139,19 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
|
||||||
return "webhdfs";
|
return "webhdfs";
|
||||||
}
|
}
|
||||||
|
|
||||||
protected FileSystem getHttpFSFileSystem() throws Exception {
|
protected FileSystem getHttpFSFileSystem(Configuration conf) throws
|
||||||
Configuration conf = new Configuration();
|
Exception {
|
||||||
conf.set("fs.webhdfs.impl", getFileSystemClass().getName());
|
conf.set("fs.webhdfs.impl", getFileSystemClass().getName());
|
||||||
URI uri = new URI(getScheme() + "://" +
|
URI uri = new URI(getScheme() + "://" +
|
||||||
TestJettyHelper.getJettyURL().toURI().getAuthority());
|
TestJettyHelper.getJettyURL().toURI().getAuthority());
|
||||||
return FileSystem.get(uri, conf);
|
return FileSystem.get(uri, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected FileSystem getHttpFSFileSystem() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
return getHttpFSFileSystem(conf);
|
||||||
|
}
|
||||||
|
|
||||||
protected void testGet() throws Exception {
|
protected void testGet() throws Exception {
|
||||||
FileSystem fs = getHttpFSFileSystem();
|
FileSystem fs = getHttpFSFileSystem();
|
||||||
Assert.assertNotNull(fs);
|
Assert.assertNotNull(fs);
|
||||||
|
@ -355,6 +363,51 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
|
||||||
assertEquals(stati[0].getPath().getName(), path.getName());
|
assertEquals(stati[0].getPath().getName(), path.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void assertSameListing(FileSystem expected, FileSystem
|
||||||
|
actual, Path p) throws IOException {
|
||||||
|
// Consume all the entries from both iterators
|
||||||
|
RemoteIterator<FileStatus> exIt = expected.listStatusIterator(p);
|
||||||
|
List<FileStatus> exStatuses = new ArrayList<>();
|
||||||
|
while (exIt.hasNext()) {
|
||||||
|
exStatuses.add(exIt.next());
|
||||||
|
}
|
||||||
|
RemoteIterator<FileStatus> acIt = actual.listStatusIterator(p);
|
||||||
|
List<FileStatus> acStatuses = new ArrayList<>();
|
||||||
|
while (acIt.hasNext()) {
|
||||||
|
acStatuses.add(acIt.next());
|
||||||
|
}
|
||||||
|
assertEquals(exStatuses.size(), acStatuses.size());
|
||||||
|
for (int i = 0; i < exStatuses.size(); i++) {
|
||||||
|
FileStatus expectedStatus = exStatuses.get(i);
|
||||||
|
FileStatus actualStatus = acStatuses.get(i);
|
||||||
|
// Path URIs are fully qualified, so compare just the path component
|
||||||
|
assertEquals(expectedStatus.getPath().toUri().getPath(),
|
||||||
|
actualStatus.getPath().toUri().getPath());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testListStatusBatch() throws Exception {
|
||||||
|
// LocalFileSystem writes checksum files next to the data files, which
|
||||||
|
// show up when listing via LFS. This makes the listings not compare
|
||||||
|
// properly.
|
||||||
|
Assume.assumeFalse(isLocalFS());
|
||||||
|
|
||||||
|
FileSystem proxyFs = FileSystem.get(getProxiedFSConf());
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 2);
|
||||||
|
FileSystem httpFs = getHttpFSFileSystem(conf);
|
||||||
|
|
||||||
|
// Test an empty directory
|
||||||
|
Path dir = new Path(getProxiedFSTestDir(), "dir");
|
||||||
|
proxyFs.mkdirs(dir);
|
||||||
|
assertSameListing(proxyFs, httpFs, dir);
|
||||||
|
// Create and test in a loop
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
proxyFs.create(new Path(dir, "file" + i)).close();
|
||||||
|
assertSameListing(proxyFs, httpFs, dir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void testWorkingdirectory() throws Exception {
|
private void testWorkingdirectory() throws Exception {
|
||||||
FileSystem fs = FileSystem.get(getProxiedFSConf());
|
FileSystem fs = FileSystem.get(getProxiedFSConf());
|
||||||
Path workingDir = fs.getWorkingDirectory();
|
Path workingDir = fs.getWorkingDirectory();
|
||||||
|
@ -863,7 +916,7 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
|
||||||
GET, OPEN, CREATE, APPEND, TRUNCATE, CONCAT, RENAME, DELETE, LIST_STATUS,
|
GET, OPEN, CREATE, APPEND, TRUNCATE, CONCAT, RENAME, DELETE, LIST_STATUS,
|
||||||
WORKING_DIRECTORY, MKDIRS, SET_TIMES, SET_PERMISSION, SET_OWNER,
|
WORKING_DIRECTORY, MKDIRS, SET_TIMES, SET_PERMISSION, SET_OWNER,
|
||||||
SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY, FILEACLS, DIRACLS, SET_XATTR,
|
SET_REPLICATION, CHECKSUM, CONTENT_SUMMARY, FILEACLS, DIRACLS, SET_XATTR,
|
||||||
GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION
|
GET_XATTRS, REMOVE_XATTR, LIST_XATTRS, ENCRYPTION, LIST_STATUS_BATCH
|
||||||
}
|
}
|
||||||
|
|
||||||
private void operation(Operation op) throws Exception {
|
private void operation(Operation op) throws Exception {
|
||||||
|
@ -940,6 +993,9 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
|
||||||
case ENCRYPTION:
|
case ENCRYPTION:
|
||||||
testEncryption();
|
testEncryption();
|
||||||
break;
|
break;
|
||||||
|
case LIST_STATUS_BATCH:
|
||||||
|
testListStatusBatch();
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue