Hadoop-6223. Add new file system interface AbstractFileSystem with implementation of some file systems that delegate to old FileSystem. Contributed by Sanjay Radia.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@831475 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2009-10-30 22:24:22 +00:00
parent b5c31f4ee6
commit 3f371a0a64
19 changed files with 2381 additions and 253 deletions

View File

@ -249,6 +249,10 @@ Release 0.21.0 - Unreleased
HADOOP-6313. Implement Syncable interface in FSDataOutputStream to expose
flush APIs to application users. (Hairong Kuang via suresh)
Hadoop-6223. Add new file system interface AbstractFileSystem with
implementation of some file systems that delegate to old FileSystem.
(Sanjay Radia via suresh)
IMPROVEMENTS
HADOOP-4565. Added CombineFileInputFormat to use data locality information

View File

@ -162,6 +162,19 @@
<description>The FileSystem for hdfs: uris.</description>
</property>
<property>
<name>fs.AbstractFileSystem.file.impl</name>
<value>org.apache.hadoop.fs.local.LocalFs</value>
<description>The AbstractFileSystem for file: uris.</description>
</property>
<property>
<name>fs.AbstractFileSystem.hdfs.impl</name>
<value>org.apache.hadoop.fs.Hdfs</value>
<description>The FileSystem for hdfs: uris.</description>
</property>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3.S3FileSystem</value>

View File

@ -0,0 +1,665 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.EnumSet;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
/**
* This class provides an interface for implementors of a Hadoop filesystem
* (analogous to the VFS of Unix). Applications do not access this class;
* instead they access files across all filesystems using {@link FileContext}.
*
* Pathnames passed to AbstractFileSystem can be fully qualified URI that
* matches the "this" filesystem (ie same scheme and authority)
* or a Slash-relative name that is assumed to be relative
* to the root of the "this" filesystem .
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
public abstract class AbstractFileSystem {
static final Log LOG = LogFactory.getLog(AbstractFileSystem.class);
/** Recording statistics per a filesystem class. */
private static final Map<Class<? extends AbstractFileSystem>, Statistics>
STATISTICS_TABLE =
new IdentityHashMap<Class<? extends AbstractFileSystem>, Statistics>();
/** Cache of constructors for each filesystem class. */
private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
new ConcurrentHashMap<Class<?>, Constructor<?>>();
private static final Class<?>[] URI_CONFIG_ARGS =
new Class[]{URI.class, Configuration.class};
/** The statistics for this file system. */
protected Statistics statistics;
private final URI myUri;
protected Statistics getStatistics() {
return statistics;
}
/**
* Prohibits names which contain a ".", "..". ":" or "/"
*/
private static boolean isValidName(String src) {
// Check for ".." "." ":" "/"
StringTokenizer tokens = new StringTokenizer(src, Path.SEPARATOR);
while(tokens.hasMoreTokens()) {
String element = tokens.nextToken();
if (element.equals("..") ||
element.equals(".") ||
(element.indexOf(":") >= 0)) {
return false;
}
}
return true;
}
/**
* Create an object for the given class and initialize it from conf.
* @param theClass class of which an object is created
* @param conf Configuration
* @return a new object
*/
@SuppressWarnings("unchecked")
static <T> T newInstance(Class<T> theClass,
URI uri, Configuration conf) {
T result;
try {
Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
if (meth == null) {
meth = theClass.getDeclaredConstructor(URI_CONFIG_ARGS);
meth.setAccessible(true);
CONSTRUCTOR_CACHE.put(theClass, meth);
}
result = meth.newInstance(uri, conf);
} catch (Exception e) {
throw new RuntimeException(e);
}
return result;
}
/**
* Create a file system instance for the specified uri using the conf.
* The conf is used to find the class name that implements the filesystem.
* The conf is also passed to the filesystem for its configuration.
* @param uri
* @param conf
* @return
* @throws IOException
*/
private static AbstractFileSystem createFileSystem(URI uri,
Configuration conf) throws IOException {
Class<?> clazz = conf.getClass("fs.AbstractFileSystem." +
uri.getScheme() + ".impl", null);
if (clazz == null) {
throw new IOException("No AbstractFileSystem for scheme: "
+ uri.getScheme());
}
return (AbstractFileSystem) newInstance(clazz, uri, conf);
}
/**
* Get the statistics for a particular file system.
* @param cls the class to lookup
* @return a statistics object
*/
protected static synchronized Statistics getStatistics(String scheme,
Class<? extends AbstractFileSystem> cls) {
Statistics result = STATISTICS_TABLE.get(cls);
if (result == null) {
result = new Statistics(scheme);
STATISTICS_TABLE.put(cls, result);
}
return result;
}
protected static synchronized void clearStatistics() {
for(Statistics stat: STATISTICS_TABLE.values()) {
stat.reset();
}
}
protected static synchronized void printStatistics() throws IOException {
for (Map.Entry<Class<? extends AbstractFileSystem>, Statistics> pair:
STATISTICS_TABLE.entrySet()) {
System.out.println(" FileSystem " + pair.getKey().getName() +
": " + pair.getValue());
}
}
/**
* The main factory method for creating a filesystem.
* Get a filesystem for the URI's scheme and authority.
* The scheme of the URI determines a configuration property name,
* <tt>fs.AbstractFileSystem.<i>scheme</i>.impl</tt> whose value names
* the AbstractFileSystem class.
* The entire URI and conf is passed to the AbstractFileSystem factory
* method.
* @param uri for the file system to be created.
* @param conf which is passed to the filesystem impl.
*/
static AbstractFileSystem get(final URI uri, final Configuration conf)
throws IOException {
return createFileSystem(uri, conf);
}
/**
* Constructor to be called by subclasses.
*
* @param uri for this file system.
* @param supportedScheme the scheme supported by the implementor
* @param authorityNeeded if true then theURI must have authority, if false
* then the URI must have null authority.
* @throws URISyntaxException
*/
protected AbstractFileSystem(final URI uri, final String supportedScheme,
final boolean authorityNeeded, final int defaultPort) throws URISyntaxException {
myUri = getUri(uri, supportedScheme, authorityNeeded, defaultPort);
statistics = getStatistics(supportedScheme, getClass());
}
protected void checkScheme(URI uri, String supportedScheme) {
String scheme = uri.getScheme();
if (scheme == null) {
throw new IllegalArgumentException("Uri without scheme: " + uri);
}
if (!scheme.equals(supportedScheme)) {
throw new IllegalArgumentException("Uri scheme " + uri
+ " does not match the scheme " + supportedScheme);
}
}
/**
* Get the URI for the file system based on the given URI. The path, query
* part of the given URI is stripped out and default filesystem port is used
* to form the URI.
*
* @param uri FileSystem URI.
* @param authorityNeeded if true authority cannot be null in the URI. If
* false authority must be null.
* @param defaultPort default port to use if port is not specified in the URI.
* @return URI of the file system
* @throws URISyntaxException
*/
private URI getUri(URI uri, String supportedScheme,
boolean authorityNeeded, int defaultPort) throws URISyntaxException {
checkScheme(uri, supportedScheme);
// A filesystem implementation that requires authority must always
// specify default port
if (defaultPort < 0 && authorityNeeded) {
throw new IllegalArgumentException(
"FileSystem implementation error - default port " + defaultPort
+ " is not valid");
}
String authority = uri.getAuthority();
if (!authorityNeeded) {
if (authority != null) {
throw new IllegalArgumentException("Scheme with non-null authority: "
+ uri);
}
return new URI(supportedScheme + ":///");
}
if (authority == null) {
throw new IllegalArgumentException("Uri without authority: " + uri);
}
int port = uri.getPort();
port = port == -1 ? defaultPort : port;
return new URI(supportedScheme + "://" + uri.getHost() + ":" + port);
}
/**
* The default port of this filesystem.
* @return default port of this filesystem's Uri scheme
* A uri with a port of -1 => default port;
*/
protected abstract int getUriDefaultPort();
/**
* Returns a URI whose scheme and authority identify this FileSystem.
* @return the uri of this filesystem.
*/
protected URI getUri() {
return myUri;
}
/**
* Check that a Path belongs to this FileSystem.
*
* If the path is fully qualified URI, then its scheme and authority
* matches that of this file system. Otherwise the path must be
* slash-relative name.
*/
protected void checkPath(Path path) {
URI uri = path.toUri();
String thatScheme = uri.getScheme();
String thatAuthority = uri.getAuthority();
if (thatScheme == null) {
if (thatAuthority == null) {
if (path.isUriPathAbsolute()) {
return;
}
throw new IllegalArgumentException("relative paths not allowed:" +
path);
} else {
throw new IllegalArgumentException(
"Path without scheme with non-null autorhrity:" + path);
}
}
String thisScheme = this.getUri().getScheme();
String thisAuthority = this.getUri().getAuthority();
// Schemes and authorities must match.
// Allow for null Authority for file:///
if (!thisScheme.equalsIgnoreCase(thatScheme) ||
(thisAuthority != null &&
!thisAuthority.equalsIgnoreCase(thatAuthority)) ||
(thisAuthority == null && thatAuthority != null)) {
throw new IllegalArgumentException("Wrong FS: " + path +
", expected: "+this.getUri());
}
int thisPort = this.getUri().getPort();
int thatPort = path.toUri().getPort();
if (thatPort == -1) { // -1 => defaultPort of Uri scheme
thatPort = this.getUriDefaultPort();
}
if (thisPort != thatPort) {
throw new IllegalArgumentException("Wrong FS: "+path+
", expected: "+this.getUri());
}
}
/**
* Get the path-part of a pathname. Checks that URI matches this filesystem
* and that the path-part is a valid name.
* @param p
* @return path-part of the Path p
*/
protected String getUriPath(final Path p) {
checkPath(p);
String s = p.toUri().getPath();
if (!isValidName(s)) {
throw new IllegalArgumentException("Path part " + s + " from URI" +
p + " is not a valid filename.");
}
return s;
}
/**
* Some file systems like LocalFileSystem have an initial workingDir
* that we use as the starting workingDir. For other file systems
* like HDFS there is no built in notion of an initial workingDir.
*
* @return the initial workingDir if the filesystem if it has such a notion
* otherwise return a null.
*/
protected Path getInitialWorkingDirectory() {
return null;
}
/**
* Return the current user's home directory in this filesystem.
* The default implementation returns "/user/$USER/".
*/
protected Path getHomeDirectory() {
return new Path("/user/"+System.getProperty("user.name")).makeQualified(
getUri(), null);
}
/**
* Return a set of server default configuration values.
* @return server default configuration values
* @throws IOException
*/
protected abstract FsServerDefaults getServerDefaults() throws IOException;
/**
* The specification of this method matches that of
* {@link FileContext#create(Path, EnumSet, Options.CreateOpts...)} except
* that the Path f must be fully qualified and the permission is absolute
* (i.e. umask has been applied).
*/
protected final FSDataOutputStream create(final Path f,
final EnumSet<CreateFlag> createFlag, Options.CreateOpts... opts)
throws IOException {
checkPath(f);
int bufferSize = -1;
short replication = -1;
long blockSize = -1;
int bytesPerChecksum = -1;
FsPermission permission = null;
Progressable progress = null;
Boolean createParent = null;
for (CreateOpts iOpt : opts) {
if (CreateOpts.BlockSize.class.isInstance(iOpt)) {
if (blockSize != -1) {
throw new IllegalArgumentException("multiple varargs of same kind");
}
blockSize = ((CreateOpts.BlockSize) iOpt).getValue();
} else if (CreateOpts.BufferSize.class.isInstance(iOpt)) {
if (bufferSize != -1) {
throw new IllegalArgumentException("multiple varargs of same kind");
}
bufferSize = ((CreateOpts.BufferSize) iOpt).getValue();
} else if (CreateOpts.ReplicationFactor.class.isInstance(iOpt)) {
if (replication != -1) {
throw new IllegalArgumentException("multiple varargs of same kind");
}
replication = ((CreateOpts.ReplicationFactor) iOpt).getValue();
} else if (CreateOpts.BytesPerChecksum.class.isInstance(iOpt)) {
if (bytesPerChecksum != -1) {
throw new IllegalArgumentException("multiple varargs of same kind");
}
bytesPerChecksum = ((CreateOpts.BytesPerChecksum) iOpt).getValue();
} else if (CreateOpts.Perms.class.isInstance(iOpt)) {
if (permission != null) {
throw new IllegalArgumentException("multiple varargs of same kind");
}
permission = ((CreateOpts.Perms) iOpt).getValue();
} else if (CreateOpts.Progress.class.isInstance(iOpt)) {
if (progress != null) {
throw new IllegalArgumentException("multiple varargs of same kind");
}
progress = ((CreateOpts.Progress) iOpt).getValue();
} else if (CreateOpts.CreateParent.class.isInstance(iOpt)) {
if (createParent != null) {
throw new IllegalArgumentException("multiple varargs of same kind");
}
createParent = ((CreateOpts.CreateParent) iOpt).getValue();
} else {
throw new IllegalArgumentException("Unkown CreateOpts of type " +
iOpt.getClass().getName());
}
}
if (permission == null) {
throw new IllegalArgumentException("no permission supplied");
}
FsServerDefaults ssDef = getServerDefaults();
if (ssDef.getBlockSize() % ssDef.getBytesPerChecksum() != 0) {
throw new IOException("Internal error: default blockSize is" +
" not a multiple of default bytesPerChecksum ");
}
if (blockSize == -1) {
blockSize = ssDef.getBlockSize();
}
if (bytesPerChecksum == -1) {
bytesPerChecksum = ssDef.getBytesPerChecksum();
}
if (bufferSize == -1) {
bufferSize = ssDef.getFileBufferSize();
}
if (replication == -1) {
replication = ssDef.getReplication();
}
if (createParent == null) {
createParent = false;
}
if (blockSize % bytesPerChecksum != 0) {
throw new IllegalArgumentException(
"blockSize should be a multiple of checksumsize");
}
return this.createInternal(f, createFlag, permission, bufferSize,
replication, blockSize, progress, bytesPerChecksum, createParent);
}
/**
* The specification of this method matches that of
* {@link #create(Path, EnumSet, Options.CreateOpts...)} except that the opts
* have been declared explicitly.
*/
protected abstract FSDataOutputStream createInternal(Path f,
EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
short replication, long blockSize, Progressable progress,
int bytesPerChecksum, boolean createParent) throws IOException;
/**
* The specification of this method matches that of
* {@link FileContext#mkdir(Path, FsPermission, boolean)} except that the Path
* f must be fully qualified and the permission is absolute (ie umask has been
* applied).
*/
protected abstract void mkdir(final Path dir,
final FsPermission permission, final boolean createParent)
throws IOException;
/**
* The specification of this method matches that of
* {@link FileContext#delete(Path, boolean)} except that Path f must be for
* this filesystem.
*/
protected abstract boolean delete(final Path f, final boolean recursive)
throws IOException;
/**
* The specification of this method matches that of
* {@link FileContext#open(Path)} except that Path f must be for this
* filesystem.
*/
protected FSDataInputStream open(final Path f) throws IOException {
return open(f, getServerDefaults().getFileBufferSize());
}
/**
* The specification of this method matches that of
* {@link FileContext#open(Path, int)} except that Path f must be for this
* filesystem.
*/
protected abstract FSDataInputStream open(final Path f, int bufferSize)
throws IOException;
/**
* The specification of this method matches that of
* {@link FileContext#setReplication(Path, short)} except that Path f must be
* for this filesystem.
*/
protected abstract boolean setReplication(final Path f,
final short replication) throws IOException;
/**
* The specification of this method matches that of
* {@link FileContext#rename(Path, Path, Options.Rename...)} except that Path
* f must be for this filesystem.
*/
protected final void rename(final Path src, final Path dst,
final Options.Rename... options) throws IOException {
boolean overwrite = false;
if (null != options) {
for (Rename option : options) {
if (option == Rename.OVERWRITE) {
overwrite = true;
}
}
}
renameInternal(src, dst, overwrite);
}
/**
* The specification of this method matches that of
* {@link FileContext#rename(Path, Path, Options.Rename...)} except that Path
* f must be for this filesystem and NO OVERWRITE is performed.
*
* Filesystems that do not have a built in overwrite need implement only this
* method and can take advantage of the default impl of the other
* {@link #renameInternal(Path, Path, boolean)}
*/
protected abstract void renameInternal(final Path src, final Path dst)
throws IOException;
/**
* The specification of this method matches that of
* {@link FileContext#rename(Path, Path, Options.Rename...)} except that Path
* f must be for this filesystem.
*/
protected void renameInternal(final Path src, final Path dst,
boolean overwrite) throws IOException {
// Default implementation deals with overwrite in a non-atomic way
final FileStatus srcStatus = getFileStatus(src);
if (srcStatus == null) {
throw new FileNotFoundException("rename source " + src + " not found.");
}
FileStatus dstStatus;
try {
dstStatus = getFileStatus(dst);
} catch (IOException e) {
dstStatus = null;
}
if (dstStatus != null) {
if (srcStatus.isDir() != dstStatus.isDir()) {
throw new IOException("Source " + src + " Destination " + dst
+ " both should be either file or directory");
}
if (!overwrite) {
throw new FileAlreadyExistsException("rename destination " + dst
+ " already exists.");
}
// Delete the destination that is a file or an empty directory
if (dstStatus.isDir()) {
FileStatus[] list = listStatus(dst);
if (list != null && list.length != 0) {
throw new IOException(
"rename cannot overwrite non empty destination directory " + dst);
}
}
delete(dst, false);
} else {
final Path parent = dst.getParent();
final FileStatus parentStatus = getFileStatus(parent);
if (parentStatus == null) {
throw new FileNotFoundException("rename destination parent " + parent
+ " not found.");
}
if (!parentStatus.isDir()) {
throw new ParentNotDirectoryException("rename destination parent "
+ parent + " is a file.");
}
}
renameInternal(src, dst);
}
/**
* The specification of this method matches that of
* {@link FileContext#setPermission(Path, FsPermission)} except that Path f
* must be for this filesystem.
*/
protected abstract void setPermission(final Path f,
final FsPermission permission) throws IOException;
/**
* The specification of this method matches that of
* {@link FileContext#setOwner(Path, String, String)} except that Path f must
* be for this filesystem.
*/
protected abstract void setOwner(final Path f, final String username,
final String groupname) throws IOException;
/**
* The specification of this method matches that of
* {@link FileContext#setTimes(Path, long, long)} except that Path f must be
* for this filesystem.
*/
protected abstract void setTimes(final Path f, final long mtime,
final long atime) throws IOException;
/**
* The specification of this method matches that of
* {@link FileContext#getFileChecksum(Path)} except that Path f must be for
* this filesystem.
*/
protected abstract FileChecksum getFileChecksum(final Path f)
throws IOException;
/**
* The specification of this method matches that of
* {@link FileContext#setVerifyChecksum(boolean, Path)} except that Path f
* must be for this filesystem.
*/
protected abstract FileStatus getFileStatus(final Path f) throws IOException;
/**
* The specification of this method matches that of
* {@link FileContext#getFileBlockLocations(Path, long, long)} except that
* Path f must be for this filesystem.
*/
protected abstract BlockLocation[] getFileBlockLocations(final Path f,
final long start, final long len) throws IOException;
/**
* The specification of this method matches that of
* {@link FileContext#getFsStatus(Path)} except that Path f must be for this
* filesystem.
*/
protected FsStatus getFsStatus(final Path f) throws IOException {
// default impl gets FsStatus of root
return getFsStatus();
}
/**
* The specification of this method matches that of
* {@link FileContext#getFsStatus(Path)} except that Path f must be for this
* filesystem.
*/
protected abstract FsStatus getFsStatus() throws IOException;
/**
* The specification of this method matches that of
* {@link FileContext#listStatus(Path)} except that Path f must be for this
* filesystem.
*/
protected abstract FileStatus[] listStatus(final Path f) throws IOException;
/**
* The specification of this method matches that of
* {@link FileContext#setVerifyChecksum(boolean, Path)} except that Path f
* must be for this filesystem.
*/
protected abstract void setVerifyChecksum(final boolean verifyChecksum)
throws IOException;
}

View File

@ -0,0 +1,462 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.*;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.StringUtils;
/**
* Abstract Checksumed Fs.
* It provide a basic implementation of a Checksumed Fs,
* which creates a checksum file for each raw file.
* It generates & verifies checksums at the client side.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
public abstract class ChecksumFs extends FilterFs {
private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
private int defaultBytesPerChecksum = 512;
private boolean verifyChecksum = true;
public static double getApproxChkSumLength(long size) {
return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
}
public ChecksumFs(AbstractFileSystem theFs)
throws IOException, URISyntaxException {
super(theFs);
defaultBytesPerChecksum =
getMyFs().getServerDefaults().getBytesPerChecksum();
}
/**
* Set whether to verify checksum.
*/
public void setVerifyChecksum(boolean inVerifyChecksum) {
this.verifyChecksum = inVerifyChecksum;
}
/** get the raw file system. */
public AbstractFileSystem getRawFs() {
return getMyFs();
}
/** Return the name of the checksum file associated with a file.*/
public Path getChecksumFile(Path file) {
return new Path(file.getParent(), "." + file.getName() + ".crc");
}
/** Return true iff file is a checksum file name.*/
public static boolean isChecksumFile(Path file) {
String name = file.getName();
return name.startsWith(".") && name.endsWith(".crc");
}
/** Return the length of the checksum file given the size of the
* actual file.
**/
public long getChecksumFileLength(Path file, long fileSize) {
return getChecksumLength(fileSize, getBytesPerSum());
}
/** Return the bytes Per Checksum. */
public int getBytesPerSum() {
return defaultBytesPerChecksum;
}
private int getSumBufferSize(int bytesPerSum, int bufferSize)
throws IOException {
int defaultBufferSize = getMyFs().getServerDefaults().getFileBufferSize();
int proportionalBufferSize = bufferSize / bytesPerSum;
return Math.max(bytesPerSum,
Math.max(proportionalBufferSize, defaultBufferSize));
}
/*******************************************************
* For open()'s FSInputStream
* It verifies that data matches checksums.
*******************************************************/
private static class ChecksumFSInputChecker extends FSInputChecker {
public static final Log LOG
= LogFactory.getLog(FSInputChecker.class);
private static final int HEADER_LENGTH = 8;
private ChecksumFs fs;
private FSDataInputStream datas;
private FSDataInputStream sums;
private int bytesPerSum = 1;
private long fileLen = -1L;
public ChecksumFSInputChecker(ChecksumFs fs, Path file)
throws IOException {
this(fs, file, fs.getServerDefaults().getFileBufferSize());
}
public ChecksumFSInputChecker(ChecksumFs fs, Path file, int bufferSize)
throws IOException {
super(file, fs.getFileStatus(file).getReplication());
this.datas = fs.getRawFs().open(file, bufferSize);
this.fs = fs;
Path sumFile = fs.getChecksumFile(file);
try {
int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(),
bufferSize);
sums = fs.getRawFs().open(sumFile, sumBufferSize);
byte[] version = new byte[CHECKSUM_VERSION.length];
sums.readFully(version);
if (!Arrays.equals(version, CHECKSUM_VERSION)) {
throw new IOException("Not a checksum file: "+sumFile);
}
this.bytesPerSum = sums.readInt();
set(fs.verifyChecksum, new PureJavaCrc32(), bytesPerSum, 4);
} catch (FileNotFoundException e) { // quietly ignore
set(fs.verifyChecksum, null, 1, 0);
} catch (IOException e) { // loudly ignore
LOG.warn("Problem opening checksum file: "+ file +
". Ignoring exception: " +
StringUtils.stringifyException(e));
set(fs.verifyChecksum, null, 1, 0);
}
}
private long getChecksumFilePos(long dataPos) {
return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
}
protected long getChunkPosition(long dataPos) {
return dataPos/bytesPerSum*bytesPerSum;
}
public int available() throws IOException {
return datas.available() + super.available();
}
public int read(long position, byte[] b, int off, int len)
throws IOException {
// parameter check
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
if (position<0) {
throw new IllegalArgumentException(
"Parameter position can not to be negative");
}
ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
checker.seek(position);
int nread = checker.read(b, off, len);
checker.close();
return nread;
}
public void close() throws IOException {
datas.close();
if (sums != null) {
sums.close();
}
set(fs.verifyChecksum, null, 1, 0);
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
final long sumsPos = getChecksumFilePos(targetPos);
fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
final boolean newDataSource = datas.seekToNewSource(targetPos);
return sums.seekToNewSource(sumsPos) || newDataSource;
}
@Override
protected int readChunk(long pos, byte[] buf, int offset, int len,
byte[] checksum) throws IOException {
boolean eof = false;
if (needChecksum()) {
try {
final long checksumPos = getChecksumFilePos(pos);
if (checksumPos != sums.getPos()) {
sums.seek(checksumPos);
}
sums.readFully(checksum);
} catch (EOFException e) {
eof = true;
}
len = bytesPerSum;
}
if (pos != datas.getPos()) {
datas.seek(pos);
}
final int nread = readFully(datas, buf, offset, len);
if (eof && nread > 0) {
throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
}
return nread;
}
/* Return the file length */
private long getFileLength() throws IOException {
if (fileLen==-1L) {
fileLen = fs.getFileStatus(file).getLen();
}
return fileLen;
}
/**
* Skips over and discards <code>n</code> bytes of data from the
* input stream.
*
* The <code>skip</code> method skips over some smaller number of bytes
* when reaching end of file before <code>n</code> bytes have been skipped.
* The actual number of bytes skipped is returned. If <code>n</code> is
* negative, no bytes are skipped.
*
* @param n the number of bytes to be skipped.
* @return the actual number of bytes skipped.
* @exception IOException if an I/O error occurs.
* ChecksumException if the chunk to skip to is corrupted
*/
public synchronized long skip(long n) throws IOException {
final long curPos = getPos();
final long fileLength = getFileLength();
if (n+curPos > fileLength) {
n = fileLength - curPos;
}
return super.skip(n);
}
/**
* Seek to the given position in the stream.
* The next read() will be from that position.
*
* <p>This method does not allow seek past the end of the file.
* This produces IOException.
*
* @param pos the postion to seek to.
* @exception IOException if an I/O error occurs or seeks after EOF
* ChecksumException if the chunk to seek to is corrupted
*/
public synchronized void seek(long pos) throws IOException {
if (pos>getFileLength()) {
throw new IOException("Cannot seek after EOF");
}
super.seek(pos);
}
}
/**
* Opens an FSDataInputStream at the indicated Path.
* @param f the file name to open
* @param bufferSize the size of the buffer to be used.
*/
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return new FSDataInputStream(
new ChecksumFSInputChecker(this, f, bufferSize));
}
/**
* Calculated the length of the checksum file in bytes.
* @param size the length of the data file in bytes
* @param bytesPerSum the number of bytes in a checksum block
* @return the number of bytes in the checksum file
*/
public static long getChecksumLength(long size, int bytesPerSum) {
//the checksum length is equal to size passed divided by bytesPerSum +
//bytes written in the beginning of the checksum file.
return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
CHECKSUM_VERSION.length + 4;
}
/** This class provides an output stream for a checksummed file.
* It generates checksums for data. */
private static class ChecksumFSOutputSummer extends FSOutputSummer {
private FSDataOutputStream datas;
private FSDataOutputStream sums;
private static final float CHKSUM_AS_FRACTION = 0.01f;
public ChecksumFSOutputSummer(final ChecksumFs fs, final Path file,
final EnumSet<CreateFlag> createFlag,
final FsPermission absolutePermission, final int bufferSize,
final short replication, final long blockSize,
final Progressable progress, final int bytesPerChecksum,
final boolean createParent) throws IOException {
super(new PureJavaCrc32(), fs.getBytesPerSum(), 4);
this.datas = fs.getRawFs().createInternal(file, createFlag,
absolutePermission, bufferSize, replication, blockSize, progress,
bytesPerChecksum, createParent);
// Now create the chekcsumfile; adjust the buffsize
int bytesPerSum = fs.getBytesPerSum();
int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
this.sums = fs.getRawFs().createInternal(fs.getChecksumFile(file),
EnumSet.of(CreateFlag.OVERWRITE), absolutePermission, sumBufferSize,
replication, blockSize, progress, bytesPerChecksum, createParent);
sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
sums.writeInt(bytesPerSum);
}
public void close() throws IOException {
flushBuffer();
sums.close();
datas.close();
}
@Override
protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
throws IOException {
datas.write(b, offset, len);
sums.write(checksum);
}
}
@Override
protected FSDataOutputStream createInternal(Path f,
EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
int bufferSize, short replication, long blockSize, Progressable progress,
int bytesPerChecksum, boolean createParent) throws IOException {
final FSDataOutputStream out = new FSDataOutputStream(
new ChecksumFSOutputSummer(this, f, createFlag, absolutePermission,
bufferSize, replication, blockSize, progress,
bytesPerChecksum, createParent), null);
return out;
}
/** Check if exists.
* @param f source file
*/
private boolean exists(Path f) throws IOException {
try {
return getMyFs().getFileStatus(f) != null;
} catch (FileNotFoundException e) {
return false;
}
}
/** True iff the named path is a directory.
* Note: Avoid using this method. Instead reuse the FileStatus
* returned by getFileStatus() or listStatus() methods.
*/
private boolean isDirectory(Path f) throws IOException {
try {
return getMyFs().getFileStatus(f).isDir();
} catch (FileNotFoundException e) {
return false; // f does not exist
}
}
/**
* Set replication for an existing file.
* Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
* @param src file name
* @param replication new replication
* @throws IOException
* @return true if successful;
* false if file does not exist or is a directory
*/
@Override
public boolean setReplication(Path src, short replication)
throws IOException {
boolean value = getMyFs().setReplication(src, replication);
if (!value) {
return false;
}
Path checkFile = getChecksumFile(src);
if (exists(checkFile)) {
getMyFs().setReplication(checkFile, replication);
}
return true;
}
/**
* Rename files/dirs.
*/
@Override
public void renameInternal(Path src, Path dst) throws IOException {
if (isDirectory(src)) {
getMyFs().rename(src, dst);
} else {
getMyFs().rename(src, dst);
Path checkFile = getChecksumFile(src);
if (exists(checkFile)) { //try to rename checksum
if (isDirectory(dst)) {
getMyFs().rename(checkFile, dst);
} else {
getMyFs().rename(checkFile, getChecksumFile(dst));
}
}
}
}
/**
* Implement the delete(Path, boolean) in checksum
* file system.
*/
public boolean delete(Path f, boolean recursive) throws IOException{
FileStatus fstatus = null;
try {
fstatus = getMyFs().getFileStatus(f);
} catch(FileNotFoundException e) {
return false;
}
if (fstatus.isDir()) {
//this works since the crcs are in the same
//directories and the files. so we just delete
//everything in the underlying filesystem
return getMyFs().delete(f, recursive);
} else {
Path checkFile = getChecksumFile(f);
if (exists(checkFile)) {
getMyFs().delete(checkFile, true);
}
return getMyFs().delete(f, true);
}
}
/**
* Report a checksum error to the file system.
* @param f the file name containing the error
* @param in the stream open on the file
* @param inPos the position of the beginning of the bad data in the file
* @param sums the stream open on the checksum file
* @param sumsPos the position of the beginning of the bad data in the
* checksum file
* @return if retry is neccessary
*/
public boolean reportChecksumFailure(Path f, FSDataInputStream in,
long inPos, FSDataInputStream sums, long sumsPos) {
return false;
}
}

View File

@ -0,0 +1,187 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
/**
* Implementation of AbstractFileSystem based on the existing implementation of
* {@link FileSystem}.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public abstract class DelegateToFileSystem extends AbstractFileSystem {
protected final FileSystem fsImpl;
protected DelegateToFileSystem(URI theUri, FileSystem theFsImpl,
Configuration conf, String supportedScheme, boolean authorityRequired)
throws IOException, URISyntaxException {
super(theUri, supportedScheme, authorityRequired,
FileSystem.getDefaultUri(conf).getPort());
fsImpl = theFsImpl;
fsImpl.initialize(theUri, conf);
fsImpl.statistics = getStatistics();
}
@Override
protected Path getInitialWorkingDirectory() {
return fsImpl.getInitialWorkingDirectory();
}
@Override
@SuppressWarnings("deprecation") // call to primitiveCreate
protected FSDataOutputStream createInternal (Path f,
EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
short replication, long blockSize, Progressable progress,
int bytesPerChecksum, boolean createParent) throws IOException {
checkPath(f);
// Default impl assumes that permissions do not matter
// calling the regular create is good enough.
// FSs that implement permissions should override this.
if (!createParent) { // parent must exist.
// since this.create makes parent dirs automatically
// we must throw exception if parent does not exist.
final FileStatus stat = getFileStatus(f.getParent());
if (stat == null) {
throw new FileNotFoundException("Missing parent:" + f);
}
if (!stat.isDir()) {
throw new ParentNotDirectoryException("parent is not a dir:" + f);
}
// parent does exist - go ahead with create of file.
}
return fsImpl.primitiveCreate(f, absolutePermission, flag,
bufferSize, replication, blockSize, progress, bytesPerChecksum);
}
@Override
protected boolean delete(Path f, boolean recursive) throws IOException {
checkPath(f);
return fsImpl.delete(f, recursive);
}
@Override
protected BlockLocation[] getFileBlockLocations(Path f, long start, long len)
throws IOException {
checkPath(f);
return fsImpl.getFileBlockLocations(f, start, len);
}
@Override
protected FileChecksum getFileChecksum(Path f) throws IOException {
checkPath(f);
return fsImpl.getFileChecksum(f);
}
@Override
protected FileStatus getFileStatus(Path f) throws IOException {
checkPath(f);
return fsImpl.getFileStatus(f);
}
@Override
protected FsStatus getFsStatus() throws IOException {
return fsImpl.getStatus();
}
@Override
protected FsServerDefaults getServerDefaults() throws IOException {
return fsImpl.getServerDefaults();
}
@Override
protected int getUriDefaultPort() {
return 0;
}
@Override
protected FileStatus[] listStatus(Path f) throws IOException {
checkPath(f);
return fsImpl.listStatus(f);
}
@Override
@SuppressWarnings("deprecation") // call to primitiveMkdir
protected void mkdir(Path dir, FsPermission permission, boolean createParent)
throws IOException {
checkPath(dir);
fsImpl.primitiveMkdir(dir, permission, createParent);
}
@Override
protected FSDataInputStream open(Path f, int bufferSize) throws IOException {
checkPath(f);
return fsImpl.open(f, bufferSize);
}
@Override
@SuppressWarnings("deprecation") // call to rename
protected void renameInternal(Path src, Path dst) throws IOException {
checkPath(src);
checkPath(dst);
fsImpl.rename(src, dst, Options.Rename.NONE);
}
@Override
protected void setOwner(Path f, String username, String groupname)
throws IOException {
checkPath(f);
fsImpl.setOwner(f, username, groupname);
}
@Override
protected void setPermission(Path f, FsPermission permission)
throws IOException {
checkPath(f);
fsImpl.setPermission(f, permission);
}
@Override
protected boolean setReplication(Path f, short replication)
throws IOException {
checkPath(f);
return fsImpl.setReplication(f, replication);
}
@Override
protected void setTimes(Path f, long mtime, long atime) throws IOException {
checkPath(f);
fsImpl.setTimes(f, mtime, atime);
}
@Override
protected void setVerifyChecksum(boolean verifyChecksum) throws IOException {
fsImpl.setVerifyChecksum(verifyChecksum);
}
}

View File

@ -37,13 +37,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate.Project;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
/**
* The FileContext class provides an interface to the application writer for
@ -51,81 +50,102 @@ import org.apache.hadoop.util.Progressable;
* It provides a set of methods for the usual operation: create, open,
* list, etc
*
* *** Path Names ***
* <p>
* <b> *** Path Names *** </b>
* <p>
*
* The Hadoop filesystem supports a URI name space and URI names.
* It offers a a forest of filesystems that can be referenced using fully
* qualified URIs.
*
* Two common Hadoop filesystems implementations are
* the local filesystem: file:///path
* the hdfs filesystem hdfs://nnAddress:nnPort/path
*
* <ul>
* <li> the local filesystem: file:///path
* <li> the hdfs filesystem hdfs://nnAddress:nnPort/path
* </ul>
* While URI names are very flexible, it requires knowing the name or address
* of the server. For convenience one often wants to access the default system
* in your environment without knowing its name/address. This has an
* in one's environment without knowing its name/address. This has an
* additional benefit that it allows one to change one's default fs
* (say your admin moves you from cluster1 to cluster2).
*
* Too facilitate this Hadoop supports a notion of a default filesystem.
* (e.g. admin moves application from cluster1 to cluster2).
* <p>
* To facilitate this, Hadoop supports a notion of a default filesystem.
* The user can set his default filesystem, although this is
* typically set up for you in your environment in your default config.
* typically set up for you in your environment via your default config.
* A default filesystem implies a default scheme and authority; slash-relative
* names (such as /for/bar) are resolved relative to that default FS.
* Similarly a user can also have working-directory-relative names (i.e. names
* not starting with a slash). While the working directory is generally in the
* same default FS, the wd can be in a different FS; in particular, changing
* the default filesystem DOES NOT change the working directory,
*
* same default FS, the wd can be in a different FS.
* <p>
* Hence Hadoop path names can be one of:
* fully qualified URI: scheme://authority/path
* slash relative names: /path - relative to the default filesystem
* wd-relative names: path - relative to the working dir
*
* Relative paths with scheme (scheme:foo/bar) are illegal
*
* ****The Role of the FileContext and configuration defaults****
* <ul>
* <li> fully qualified URI: scheme://authority/path
* <li> slash relative names: /path - relative to the default filesystem
* <li> wd-relative names: path - relative to the working dir
* </ul>
* Relative paths with scheme (scheme:foo/bar) are illegal.
* <p>
* <b>****The Role of the FileContext and configuration defaults****</b>
* <p>
* The FileContext provides file namespace context for resolving file names;
* it also contains the umask for permissions, In that sense it is like the
* per-process file-related state in Unix system.
* These, in general, are obtained from the default configuration file
* in your environment, (@see {@link Configuration}
*
* These two properties
* <ul>
* <li> default file system i.e your slash)
* <li> umask
* </ul>
* in general, are obtained from the default configuration file
* in your environment, (@see {@link Configuration}).
* No other configuration parameters are obtained from the default config as
* far as the file context layer is concerned. All filesystem instances
* (i.e. deployments of filesystems) have default properties; we call these
* server side (SS) defaults. Operation like create allow one to select many
* properties: either pass them in as explicit parameters or
* one can choose to used the SS properties.
*
* properties: either pass them in as explicit parameters or use
* the SS properties.
* <p>
* The filesystem related SS defaults are
* - the home directory (default is "/user/<userName>")
* - the initial wd (only for local fs)
* - replication factor
* - block size
* - buffer size
* - bytesPerChecksum (if used).
* <ul>
* <li> the home directory (default is "/user/userName")
* <li> the initial wd (only for local fs)
* <li> replication factor
* <li> block size
* <li> buffer size
* <li> bytesPerChecksum (if used).
* </ul>
*
*
* *** Usage Model for the FileContext class ***
*
* <p>
* <b> *** Usage Model for the FileContext class *** </b>
* <p>
* Example 1: use the default config read from the $HADOOP_CONFIG/core.xml.
* Unspecified values come from core-defaults.xml in the release jar.
*
* myFiles = getFileContext(); // uses the default config
* myFiles.create(path, ...);
* myFiles.setWorkingDir(path)
* myFiles.open (path, ...);
*
* Example 2: Use a specific config, ignoring $HADOOP_CONFIG
* configX = someConfigSomeOnePassedToYou.
* myFContext = getFileContext(configX); //configX not changed but passeddown
* myFContext.create(path, ...);
* myFContext.setWorkingDir(path)
*
* Other ways of creating new FileContexts:
* getLocalFSFileContext(...) // local filesystem is the default FS
* getLocalFileContext(URI, ...) // where specified URI is default FS.
* Unspecified values come from core-defaults.xml in the release jar.
* <ul>
* <li> myFContext = FileContext.getFileContext(); // uses the default config
* // which has your default FS
* <li> myFContext.create(path, ...);
* <li> myFContext.setWorkingDir(path)
* <li> myFContext.open (path, ...);
* </ul>
* Example 2: Get a FileContext with a specific URI as the default FS
* <ul>
* <li> myFContext = FileContext.getFileContext(URI)
* <li> myFContext.create(path, ...);
* ...
* </ul>
* Example 3: FileContext with local file system as the default
* <ul>
* <li> myFContext = FileContext.getLocalFSFileContext()
* <li> myFContext.create(path, ...);
* <li> ...
* </ul>
* Example 4: Use a specific config, ignoring $HADOOP_CONFIG
* Generally you should not need use a config unless you are doing
* <ul>
* <li> configX = someConfigSomeOnePassedToYou.
* <li> myFContext = getFileContext(configX); //configX not changed but passeddown
* <li> myFContext.create(path, ...);
* <li>...
* </ul>
*
*/
@ -135,41 +155,44 @@ import org.apache.hadoop.util.Progressable;
public final class FileContext {
public static final Log LOG = LogFactory.getLog(FileContext.class);
public static final FsPermission DEFAULT_PERM = FsPermission.getDefault();
/**
* List of files that should be deleted on JVM shutdown
* List of files that should be deleted on JVM shutdown.
*/
final static Map<FileContext, Set<Path>> deleteOnExit =
static final Map<FileContext, Set<Path>> DELETE_ON_EXIT =
new IdentityHashMap<FileContext, Set<Path>>();
/** JVM shutdown hook thread */
final static FileContextFinalizer finalizer =
/** JVM shutdown hook thread. */
static final FileContextFinalizer FINALIZER =
new FileContextFinalizer();
private static final PathFilter DEFAULT_FILTER = new PathFilter() {
public boolean accept(final Path file) {
return true;
}
};
/**
* The FileContext is defined by.
* 1) defaultFS (slash)
* 2) wd
* 3) umask
*
*/
private final FileSystem defaultFS; // the default FS for this FileContext.
private final AbstractFileSystem defaultFS; //default FS for this FileContext.
private Path workingDir; // Fully qualified
private FsPermission umask;
private final Configuration conf; // passed to the filesystem below
// When we move to new AbstractFileSystem
// then it is not really needed except for
// undocumented config vars;
private final Configuration conf;
private FileContext(final FileSystem defFs, final FsPermission theUmask,
final Configuration aConf) {
private FileContext(final AbstractFileSystem defFs,
final FsPermission theUmask, final Configuration aConf) {
defaultFS = defFs;
umask = FsPermission.getUMask(aConf);
conf = aConf;
/*
* Init the wd.
* WorkingDir is implemented at the FileContext layer
* NOT at the FileSystem layer.
* NOT at the AbstractFileSystem layer.
* If the DefaultFS, such as localFilesystem has a notion of
* builtin WD, we use that as the initial WD.
* Otherwise the WD is initialized to the home directory.
@ -205,21 +228,20 @@ public final class FileContext {
* Delete all the paths that were marked as delete-on-exit.
*/
static void processDeleteOnExit() {
synchronized (deleteOnExit) {
Set<Entry<FileContext, Set<Path>>> set = deleteOnExit.entrySet();
synchronized (DELETE_ON_EXIT) {
Set<Entry<FileContext, Set<Path>>> set = DELETE_ON_EXIT.entrySet();
for (Entry<FileContext, Set<Path>> entry : set) {
FileContext fc = entry.getKey();
Set<Path> paths = entry.getValue();
for (Path path : paths) {
try {
fc.delete(path, true);
}
catch (IOException e) {
} catch (IOException e) {
LOG.warn("Ignoring failure to deleteOnExit for path " + path);
}
}
}
deleteOnExit.clear();
DELETE_ON_EXIT.clear();
}
}
@ -241,30 +263,27 @@ public final class FileContext {
* @return the filesystem of the path
* @throws IOException
*/
private FileSystem getFSofPath(final Path absOrFqPath) throws IOException {
private AbstractFileSystem getFSofPath(final Path absOrFqPath)
throws IOException {
checkNotSchemeWithRelative(absOrFqPath);
if (!absOrFqPath.isAbsolute() && absOrFqPath.toUri().getScheme() == null) {
throw new IllegalArgumentException(
"FileContext Bug: path is relative");
}
// TBD cleanup this impl once we create a new FileSystem to replace current
// one - see HADOOP-6223.
try {
// Is it the default FS for this FileContext?
defaultFS.checkPath(absOrFqPath);
return defaultFS;
} catch (Exception e) { // it is different FileSystem
return FileSystem.get(absOrFqPath.toUri(), conf);
return AbstractFileSystem.get(absOrFqPath.toUri(), conf);
}
}
/**
* Protected Static Factory methods for getting a FileContexts
* that take a FileSystem as input. To be used for testing.
* Protected since new FileSystem will be protected.
* Note new file contexts are created for each call.
* that take a AbstractFileSystem as input. To be used for testing.
*/
/**
@ -276,27 +295,24 @@ public final class FileContext {
* @return new FileContext with specifed FS as default.
* @throws IOException if the filesystem with specified cannot be created
*/
protected static FileContext getFileContext(final FileSystem defFS,
protected static FileContext getFileContext(final AbstractFileSystem defFS,
final Configuration aConf) throws IOException {
return new FileContext(defFS, FsPermission.getUMask(aConf), aConf);
}
/**
* Create a FileContext for specified FileSystem using the default config.
* Create a FileContext for specified filesystem using the default config.
*
* @param defaultFS
* @return a FileSystem for the specified URI
* @throws IOException if the filesysem with specified cannot be created
* @return a FileContext with the specified AbstractFileSystem
* as the default FS.
* @throws IOException if the filesystem with specified cannot be created
*/
protected static FileContext getFileContext(final FileSystem defaultFS)
throws IOException {
protected static FileContext getFileContext(
final AbstractFileSystem defaultFS) throws IOException {
return getFileContext(defaultFS, new Configuration());
}
public static final URI LOCAL_FS_URI = URI.create("file:///");
public static final FsPermission DEFAULT_PERM = FsPermission.getDefault();
/**
* Static Factory methods for getting a FileContext.
* Note new file contexts are created for each call.
@ -310,7 +326,7 @@ public final class FileContext {
* The keys relevant to the FileContext layer are extracted at time of
* construction. Changes to the config after the call are ignore
* by the FileContext layer.
* The conf is passed to lower layers like FileSystem and HDFS which
* The conf is passed to lower layers like AbstractFileSystem and HDFS which
* pick up their own config variables.
*/
@ -320,7 +336,7 @@ public final class FileContext {
* Unspecified key-values for config are defaulted from core-defaults.xml
* in the release jar.
*
* @throws IOException if default FileSystem in the config cannot be created
* @throws IOException if default filesystem in the config cannot be created
*/
public static FileContext getFileContext() throws IOException {
return getFileContext(new Configuration());
@ -334,7 +350,7 @@ public final class FileContext {
*/
public static FileContext getLocalFSFileContext() throws IOException {
if (localFsSingleton == null) {
localFsSingleton = getFileContext(LOCAL_FS_URI);
localFsSingleton = getFileContext(FsConstants.LOCAL_FS_URI);
}
return localFsSingleton;
}
@ -344,7 +360,7 @@ public final class FileContext {
* Create a FileContext for specified URI using the default config.
*
* @param defaultFsUri
* @return a FileSystem for the specified URI
* @return a FileContext with the specified URI as the default FS.
* @throws IOException if the filesysem with specified cannot be created
*/
public static FileContext getFileContext(final URI defaultFsUri)
@ -362,15 +378,18 @@ public final class FileContext {
*/
public static FileContext getFileContext(final URI defaultFsUri,
final Configuration aConf) throws IOException {
return getFileContext(FileSystem.get(defaultFsUri, aConf), aConf);
return getFileContext(AbstractFileSystem.get(defaultFsUri, aConf), aConf);
}
/**
* Create a FileContext using the passed config.
* Generally it is better to use {@link #getFileContext(URI, Configuration)}
* instead of this one.
*
*
* @param aConf
* @return new FileContext
* @throws IOException if default FileSystem in the config cannot be created
* @throws IOException if default filesystem in the config cannot be created
*/
public static FileContext getFileContext(final Configuration aConf)
throws IOException {
@ -385,14 +404,14 @@ public final class FileContext {
*/
public static FileContext getLocalFSFileContext(final Configuration aConf)
throws IOException {
return getFileContext(LOCAL_FS_URI, aConf);
return getFileContext(FsConstants.LOCAL_FS_URI, aConf);
}
/* This method is needed for tests. */
@InterfaceAudience.Private
@InterfaceStability.Unstable /* return type will change to AFS once
HADOOP-6223 is completed */
protected FileSystem getDefaultFileSystem() {
protected AbstractFileSystem getDefaultFileSystem() {
return defaultFS;
}
@ -463,59 +482,49 @@ public final class FileContext {
* @param f the file name to open
* @param createFlag gives the semantics of create: overwrite, append etc.
* @param opts - varargs of CreateOpt:
* Progress - to report progress on the operation - default null
* Permission - umask is applied against permisssion:
* default FsPermissions:getDefault()
* @see #setPermission(Path, FsPermission)
* CreateParent - create missing parent path
* default is to not create parents
*
* The defaults for the following are SS defaults of the
* file server implementing the tart path.
* Not all parameters make sense for all kinds of filesystem
* <ul>
* <li> Progress - to report progress on the operation - default null
* <li> Permission - umask is applied against permisssion:
* default is FsPermissions:getDefault()
* <li> CreateParent - create missing parent path; default is to not
* create parents
* <li> The defaults for the following are SS defaults of the
* file server implementing the target path.
* Not all parameters make sense for all kinds of filesystem
* - eg. localFS ignores Blocksize, replication, checksum
* BufferSize - buffersize used in FSDataOutputStream
* Blocksize - block size for file blocks
* ReplicationFactor - replication for blocks
* BytesPerChecksum - bytes per checksum
*
*
* <ul>
* <li> BufferSize - buffersize used in FSDataOutputStream
* <li> Blocksize - block size for file blocks
* <li> ReplicationFactor - replication for blocks
* <li> BytesPerChecksum - bytes per checksum
* </ul>
* </ul>
*
* @throws IOException
*
* @see #setPermission(Path, FsPermission)
*/
@SuppressWarnings("deprecation") // call to primitiveCreate
public FSDataOutputStream create(final Path f,
final EnumSet<CreateFlag> createFlag,
CreateOpts... opts)
final EnumSet<CreateFlag> createFlag,
Options.CreateOpts... opts)
throws IOException {
Path absF = fixRelativePart(f);
FileSystem fsOfAbsF = getFSofPath(absF);
AbstractFileSystem fsOfAbsF = getFSofPath(absF);
// If one of the options is a permission, extract it & apply umask
// If not, add a default Perms and apply umask;
// FileSystem#create
// AbstractFileSystem#create
FsPermission permission = null;
CreateOpts.Perms permOpt =
(CreateOpts.Perms) CreateOpts.getOpt(CreateOpts.Perms.class, opts);
FsPermission permission = (permOpt != null) ? permOpt.getValue() :
FsPermission.getDefault();
permission = permission.applyUMask(umask);
if (opts != null) {
for (int i = 0; i < opts.length; ++i) {
if (opts[i] instanceof CreateOpts.Perms) {
if (permission != null)
throw new IllegalArgumentException("multiple permissions varargs");
permission = ((CreateOpts.Perms) opts[i]).getValue();
opts[i] = CreateOpts.perms(permission.applyUMask(umask));
}
}
}
CreateOpts[] theOpts = opts;
if (permission == null) { // no permission was set
CreateOpts[] newOpts = new CreateOpts[opts.length + 1];
System.arraycopy(opts, 0, newOpts, 0, opts.length);
newOpts[opts.length] =
CreateOpts.perms(FsPermission.getDefault().applyUMask(umask));
theOpts = newOpts;
}
return fsOfAbsF.primitiveCreate(absF, createFlag, theOpts);
CreateOpts[] updatedOpts =
CreateOpts.setOpt(CreateOpts.perms(permission), opts);
return fsOfAbsF.create(absF, createFlag, updatedOpts);
}
/**
@ -529,14 +538,13 @@ public final class FileContext {
* @throws IOException when operation fails not authorized or
* if parent does not exist and createParent is false.
*/
@SuppressWarnings("deprecation") // call to primitiveMkdir
public void mkdir(final Path dir, final FsPermission permission,
final boolean createParent)
throws IOException {
Path absDir = fixRelativePart(dir);
FsPermission absFerms = (permission == null ?
FsPermission.getDefault() : permission).applyUMask(umask);
getFSofPath(absDir).primitiveMkdir(absDir, absFerms, createParent);
getFSofPath(absDir).mkdir(absDir, absFerms, createParent);
}
/**
@ -615,15 +623,15 @@ public final class FileContext {
* @param dst new path after rename
* @throws IOException on failure
*/
@SuppressWarnings("deprecation")
public void rename(final Path src, final Path dst, final Rename... options)
throws IOException {
public void rename(final Path src, final Path dst,
final Options.Rename... options) throws IOException {
final Path absSrc = fixRelativePart(src);
final Path absDst = fixRelativePart(dst);
FileSystem srcFS = getFSofPath(absSrc);
FileSystem dstFS = getFSofPath(absDst);
AbstractFileSystem srcFS = getFSofPath(absSrc);
AbstractFileSystem dstFS = getFSofPath(absDst);
if(!srcFS.getUri().equals(dstFS.getUri())) {
throw new IOException("Renames across FileSystems not supported");
throw new IOException("Renames across AbstractFileSystems not supported");
}
srcFS.rename(absSrc, absDst, options);
}
@ -750,10 +758,10 @@ public final class FileContext {
*/
public FsStatus getFsStatus(final Path f) throws IOException {
if (f == null) {
return defaultFS.getStatus(null);
return defaultFS.getFsStatus(null);
}
final Path absF = fixRelativePart(f);
return getFSofPath(absF).getStatus(absF);
return getFSofPath(absF).getFsStatus(absF);
}
/**
@ -827,15 +835,15 @@ public final class FileContext {
if (!exists(f)) {
return false;
}
synchronized (deleteOnExit) {
if (deleteOnExit.isEmpty() && !finalizer.isAlive()) {
Runtime.getRuntime().addShutdownHook(finalizer);
synchronized (DELETE_ON_EXIT) {
if (DELETE_ON_EXIT.isEmpty() && !FINALIZER.isAlive()) {
Runtime.getRuntime().addShutdownHook(FINALIZER);
}
Set<Path> set = deleteOnExit.get(this);
Set<Path> set = DELETE_ON_EXIT.get(this);
if (set == null) {
set = new TreeSet<Path>();
deleteOnExit.put(this, set);
DELETE_ON_EXIT.put(this, set);
}
set.add(f);
}
@ -878,6 +886,31 @@ public final class FileContext {
return results.toArray(new FileStatus[results.size()]);
}
/**
* Return the {@link ContentSummary} of path f.
* @param f
* @return the {@link ContentSummary} of path f.
* @throws IOException
*/
public ContentSummary getContentSummary(Path f) throws IOException {
FileStatus status = FileContext.this.getFileStatus(f);
if (!status.isDir()) {
// f is a file
return new ContentSummary(status.getLen(), 1, 0);
}
// f is a directory
long[] summary = {0, 0, 1};
for(FileStatus s : FileContext.this.listStatus(f)) {
ContentSummary c = s.isDir() ? getContentSummary(s.getPath()) :
new ContentSummary(s.getLen(), 1, 0);
summary[0] += c.getLength();
summary[1] += c.getFileCount();
summary[2] += c.getDirectoryCount();
}
return new ContentSummary(summary[0], summary[1], summary[2]);
}
/**
* Filter files/directories in the given list of paths using default
* path filter.
@ -1020,25 +1053,22 @@ public final class FileContext {
* @return an array of FileStatus objects
* @throws IOException if any I/O error occurs when fetching file status
*/
public FileStatus[] globStatus(final Path pathPattern, final PathFilter filter)
throws IOException {
public FileStatus[] globStatus(final Path pathPattern,
final PathFilter filter) throws IOException {
URI uri = getFSofPath(fixRelativePart(pathPattern)).getUri();
String filename = pathPattern.toUri().getPath();
List<String> filePatterns = GlobExpander.expand(filename);
if (filePatterns.size() == 1) {
Path p = fixRelativePart(pathPattern);
FileSystem fs = getFSofPath(p);
URI uri = fs.getUri();
return globStatusInternal(uri, p, filter);
Path absPathPattern = fixRelativePart(pathPattern);
return globStatusInternal(uri, new Path(absPathPattern.toUri()
.getPath()), filter);
} else {
List<FileStatus> results = new ArrayList<FileStatus>();
for (String filePattern : filePatterns) {
Path p = new Path(filePattern);
p = fixRelativePart(p);
FileSystem fs = getFSofPath(p);
URI uri = fs.getUri();
FileStatus[] files = globStatusInternal(uri, p, filter);
for (String iFilePattern : filePatterns) {
Path iAbsFilePattern = fixRelativePart(new Path(iFilePattern));
FileStatus[] files = globStatusInternal(uri, iAbsFilePattern, filter);
for (FileStatus file : files) {
results.add(file);
}
@ -1047,36 +1077,45 @@ public final class FileContext {
}
}
/**
*
* @param uri for all the inPathPattern
* @param inPathPattern - without the scheme & authority (take from uri)
* @param filter
* @return
* @throws IOException
*/
private FileStatus[] globStatusInternal(
final URI uri, final Path inPathPattern, final PathFilter filter)
throws IOException {
Path[] parents = new Path[1];
int level = 0;
// comes in as full path, but just in case
final Path pathPattern = fixRelativePart(inPathPattern);
assert(inPathPattern.toUri().getScheme() == null &&
inPathPattern.toUri().getAuthority() == null &&
inPathPattern.isUriPathAbsolute());
String filename = pathPattern.toUri().getPath();
String filename = inPathPattern.toUri().getPath();
// path has only zero component
if ("".equals(filename) || Path.SEPARATOR.equals(filename)) {
Path p = pathPattern.makeQualified(uri, null);
Path p = inPathPattern.makeQualified(uri, null);
return getFileStatus(new Path[]{p});
}
// path has at least one component
String[] components = filename.split(Path.SEPARATOR);
// get the first component
if (pathPattern.isAbsolute()) {
parents[0] = new Path(Path.SEPARATOR);
level = 1;
} else {
parents[0] = new Path(Path.CUR_DIR);
}
// Path is absolute, first component is "/" hence first component
// is the uri root
parents[0] = new Path(new Path(uri), new Path("/"));
level = 1;
// glob the paths that match the parent path, ie. [0, components.length-1]
boolean[] hasGlob = new boolean[]{false};
Path[] relParentPaths = globPathsLevel(parents, components, level, hasGlob);
Path[] relParentPaths =
globPathsLevel(parents, components, level, hasGlob);
FileStatus[] results;
if (relParentPaths == null || relParentPaths.length == 0) {
@ -1212,12 +1251,6 @@ public final class FileContext {
}
}
private static final PathFilter DEFAULT_FILTER = new PathFilter() {
public boolean accept(final Path file) {
return true;
}
};
/* A class that could decide if a string matches the glob or not */
private static class GlobFilter implements PathFilter {
private PathFilter userFilter = DEFAULT_FILTER;
@ -1401,7 +1434,7 @@ public final class FileContext {
}
/**
* Deletes all the paths in deleteOnExit on JVM shutdown
* Deletes all the paths in deleteOnExit on JVM shutdown.
*/
static class FileContextFinalizer extends Thread {
public synchronized void run() {

View File

@ -148,6 +148,10 @@ public class FileStatus implements Writable, Comparable {
public Path getPath() {
return path;
}
public void setPath(final Path p) {
path = p;
}
/* These are provided so that these values could be loaded lazily
* by a filesystem (e.g. local file system).

View File

@ -0,0 +1,170 @@
package org.apache.hadoop.fs;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
/**
* A <code>FilterFs</code> contains some other file system, which it uses as its
* basic file system, possibly transforming the data along the way or providing
* additional functionality. The class <code>FilterFs</code> itself simply
* overrides all methods of <code>AbstractFileSystem</code> with versions that
* pass all requests to the contained file system. Subclasses of
* <code>FilterFs</code> may further override some of these methods and may also
* provide additional methods and fields.
*
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
public abstract class FilterFs extends AbstractFileSystem {
private final AbstractFileSystem myFs;
protected AbstractFileSystem getMyFs() {
return myFs;
}
protected FilterFs(AbstractFileSystem fs) throws IOException,
URISyntaxException {
super(fs.getUri(), fs.getUri().getScheme(),
fs.getUri().getAuthority() != null, fs.getUriDefaultPort());
myFs = fs;
}
@Override
protected Path getInitialWorkingDirectory() {
return myFs.getInitialWorkingDirectory();
}
@Override
protected FSDataOutputStream createInternal(Path f,
EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
short replication, long blockSize, Progressable progress,
int bytesPerChecksum, boolean createParent) throws IOException {
checkPath(f);
return myFs.createInternal(f, flag, absolutePermission, bufferSize,
replication, blockSize, progress, bytesPerChecksum, createParent);
}
@Override
protected boolean delete(Path f, boolean recursive) throws IOException {
checkPath(f);
return myFs.delete(f, recursive);
}
@Override
protected BlockLocation[] getFileBlockLocations(Path f, long start, long len)
throws IOException {
checkPath(f);
return myFs.getFileBlockLocations(f, start, len);
}
@Override
protected FileChecksum getFileChecksum(Path f) throws IOException {
checkPath(f);
return myFs.getFileChecksum(f);
}
@Override
protected FileStatus getFileStatus(Path f) throws IOException {
checkPath(f);
return myFs.getFileStatus(f);
}
@Override
protected FsStatus getFsStatus() throws IOException {
return myFs.getFsStatus();
}
@Override
protected FsServerDefaults getServerDefaults() throws IOException {
return myFs.getServerDefaults();
}
@Override
protected int getUriDefaultPort() {
return myFs.getUriDefaultPort();
}
@Override
protected FileStatus[] listStatus(Path f) throws IOException {
checkPath(f);
return myFs.listStatus(f);
}
@Override
protected void mkdir(Path dir, FsPermission permission, boolean createParent)
throws IOException {
checkPath(dir);
myFs.mkdir(dir, permission, createParent);
}
@Override
protected FSDataInputStream open(Path f, int bufferSize) throws IOException {
checkPath(f);
return myFs.open(f, bufferSize);
}
@Override
protected void renameInternal(Path src, Path dst) throws IOException {
checkPath(src);
checkPath(dst);
myFs.rename(src, dst, Options.Rename.NONE);
}
@Override
protected void setOwner(Path f, String username, String groupname)
throws IOException {
checkPath(f);
myFs.setOwner(f, username, groupname);
}
@Override
protected void setPermission(Path f, FsPermission permission)
throws IOException {
checkPath(f);
myFs.setPermission(f, permission);
}
@Override
protected boolean setReplication(Path f, short replication)
throws IOException {
checkPath(f);
return myFs.setReplication(f, replication);
}
@Override
protected void setTimes(Path f, long mtime, long atime) throws IOException {
checkPath(f);
myFs.setTimes(f, mtime, atime);
}
@Override
protected void setVerifyChecksum(boolean verifyChecksum) throws IOException {
myFs.setVerifyChecksum(verifyChecksum);
}
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.net.URI;
/**
* FileSystem related constants.
*/
public interface FsConstants {
// URI for local filesystem
public static final URI LOCAL_FS_URI = URI.create("file:///");
// URI scheme for FTP
public static final String FTP_SCHEME = "ftp";
}

View File

@ -33,7 +33,7 @@ public final class Options {
public static BlockSize blockSize(long bs) {
return new BlockSize(bs);
}
public static BufferSize bufferSize(short bs) {
public static BufferSize bufferSize(int bs) {
return new BufferSize(bs);
}
public static ReplicationFactor repFac(short rf) {
@ -52,7 +52,7 @@ public final class Options {
return new CreateParent(false);
}
static class BlockSize extends CreateOpts {
public static class BlockSize extends CreateOpts {
private final long blockSize;
protected BlockSize(long bs) {
if (bs <= 0) {
@ -61,10 +61,10 @@ public final class Options {
}
blockSize = bs;
}
long getValue() { return blockSize; }
public long getValue() { return blockSize; }
}
static class ReplicationFactor extends CreateOpts {
public static class ReplicationFactor extends CreateOpts {
private final short replication;
protected ReplicationFactor(short rf) {
if (rf <= 0) {
@ -73,22 +73,22 @@ public final class Options {
}
replication = rf;
}
short getValue() { return replication; }
public short getValue() { return replication; }
}
static class BufferSize extends CreateOpts {
public static class BufferSize extends CreateOpts {
private final int bufferSize;
protected BufferSize(short bs) {
protected BufferSize(int bs) {
if (bs <= 0) {
throw new IllegalArgumentException(
"Buffer size must be greater than 0");
}
bufferSize = bs;
}
int getValue() { return bufferSize; }
public int getValue() { return bufferSize; }
}
static class BytesPerChecksum extends CreateOpts {
public static class BytesPerChecksum extends CreateOpts {
private final int bytesPerChecksum;
protected BytesPerChecksum(short bpc) {
if (bpc <= 0) {
@ -97,10 +97,10 @@ public final class Options {
}
bytesPerChecksum = bpc;
}
int getValue() { return bytesPerChecksum; }
public int getValue() { return bytesPerChecksum; }
}
static class Perms extends CreateOpts {
public static class Perms extends CreateOpts {
private final FsPermission permissions;
protected Perms(FsPermission perm) {
if(perm == null) {
@ -108,10 +108,10 @@ public final class Options {
}
permissions = perm;
}
FsPermission getValue() { return permissions; }
public FsPermission getValue() { return permissions; }
}
static class Progress extends CreateOpts {
public static class Progress extends CreateOpts {
private final Progressable progress;
protected Progress(Progressable prog) {
if(prog == null) {
@ -119,14 +119,65 @@ public final class Options {
}
progress = prog;
}
Progressable getValue() { return progress; }
public Progressable getValue() { return progress; }
}
static class CreateParent extends CreateOpts {
private final Boolean createParent;
public static class CreateParent extends CreateOpts {
private final boolean createParent;
protected CreateParent(boolean createPar) {
createParent = createPar;}
Boolean getValue() { return createParent; }
public boolean getValue() { return createParent; }
}
/**
* Get an option of desired type
* @param theClass is the desired class of the opt
* @param opts - not null - at least one opt must be passed
* @return an opt from one of the opts of type theClass.
* returns null if there isn't any
*/
protected static CreateOpts getOpt(Class<? extends CreateOpts> theClass, CreateOpts ...opts) {
if (opts == null) {
throw new IllegalArgumentException("Null opt");
}
CreateOpts result = null;
for (int i = 0; i < opts.length; ++i) {
if (opts[i].getClass() == theClass) {
if (result != null)
throw new IllegalArgumentException("multiple blocksize varargs");
result = opts[i];
}
}
return result;
}
/**
* set an option
* @param newValue the option to be set
* @param opts - the option is set into this array of opts
* @return updated CreateOpts[] == opts + newValue
*/
protected static <T extends CreateOpts> CreateOpts[] setOpt(T newValue,
CreateOpts ...opts) {
boolean alreadyInOpts = false;
if (opts != null) {
for (int i = 0; i < opts.length; ++i) {
if (opts[i].getClass() == newValue.getClass()) {
if (alreadyInOpts)
throw new IllegalArgumentException("multiple opts varargs");
alreadyInOpts = true;
opts[i] = newValue;
}
}
}
CreateOpts[] resultOpt = opts;
if (!alreadyInOpts) { // no newValue in opt
CreateOpts[] newOpts = new CreateOpts[opts.length + 1];
System.arraycopy(opts, 0, newOpts, 0, opts.length);
newOpts[opts.length] = newValue;
resultOpt = newOpts;
}
return resultOpt;
}
}

View File

@ -1,44 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.ftp;
import org.apache.hadoop.fs.CommonConfigurationKeys;
/**
* This class contains constants for configuration keys used
* in the ftp file system.
*
*/
public class FTPFileSystemConfigKeys extends CommonConfigurationKeys {
public static final String FTP_BLOCK_SIZE_KEY = "ftp.blocksize";
public static final long FTP_BLOCK_SIZE_DEFAULT = 64*1024*1024;
public static final String FTP_REPLICATION_KEY = "ftp.replication";
public static final short FTP_REPLICATION_DEFAULT = 1;
public static final String FTP_STREAM_BUFFER_SIZE_KEY =
"ftp.stream-buffer-size";
public static final int FTP_STREAM_BUFFER_SIZE_DEFAULT = 4096;
public static final String FTP_BYTES_PER_CHECKSUM_KEY =
"ftp.bytes-per-checksum";
public static final int FTP_BYTES_PER_CHECKSUM_DEFAULT = 512;
public static final String FTP_CLIENT_WRITE_PACKET_SIZE_KEY =
"ftp.client-write-packet-size";
public static final int FTP_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.ftp;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FsServerDefaults;
/**
* This class contains constants for configuration keys used
* in the ftp file system.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class FtpConfigKeys extends CommonConfigurationKeys {
public static final String BLOCK_SIZE_KEY = "ftp.blocksize";
public static final long BLOCK_SIZE_DEFAULT = 4*1024;
public static final String REPLICATION_KEY = "ftp.replication";
public static final short REPLICATION_DEFAULT = 1;
public static final String STREAM_BUFFER_SIZE_KEY =
"ftp.stream-buffer-size";
public static final int STREAM_BUFFER_SIZE_DEFAULT = 1024*1024;
public static final String BYTES_PER_CHECKSUM_KEY =
"ftp.bytes-per-checksum";
public static final int BYTES_PER_CHECKSUM_DEFAULT = 512;
public static final String CLIENT_WRITE_PACKET_SIZE_KEY =
"ftp.client-write-packet-size";
public static final int CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
protected static FsServerDefaults getServerDefaults() throws IOException {
return new FsServerDefaults(
BLOCK_SIZE_DEFAULT,
BYTES_PER_CHECKSUM_DEFAULT,
CLIENT_WRITE_PACKET_SIZE_DEFAULT,
REPLICATION_DEFAULT,
STREAM_BUFFER_SIZE_DEFAULT);
}
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.ftp;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.commons.net.ftp.FTP;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.DelegateToFileSystem;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.FsServerDefaults;
/**
* The FtpFs implementation of AbstractFileSystem.
* This impl delegates to the old FileSystem
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
public class FtpFs extends DelegateToFileSystem {
/**
* This constructor has the signature needed by
* {@link AbstractFileSystem#createFileSystem(URI, Configuration)}.
*
* @param theUri which must be that of localFs
* @param conf
* @throws IOException
* @throws URISyntaxException
*/
FtpFs(final URI theUri, final Configuration conf) throws IOException,
URISyntaxException {
super(theUri, new FTPFileSystem(), conf, FsConstants.FTP_SCHEME, true);
}
@Override
protected int getUriDefaultPort() {
return FTP.DEFAULT_PORT;
}
@Override
protected FsServerDefaults getServerDefaults() throws IOException {
return FtpConfigKeys.getServerDefaults();
}
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.local;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FsServerDefaults;
/**
* This class contains constants for configuration keys used
* in the local file system, raw local fs and checksum fs.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class LocalConfigKeys extends CommonConfigurationKeys {
public static final String BLOCK_SIZE_KEY = "file.blocksize";
public static final long BLOCK_SIZE_DEFAULT = 64*1024*1024;
public static final String REPLICATION_KEY = "file.replication";
public static final short REPLICATION_DEFAULT = 1;
public static final String STREAM_BUFFER_SIZE_KEY = "file.stream-buffer-size";
public static final int STREAM_BUFFER_SIZE_DEFAULT = 4096;
public static final String BYTES_PER_CHECKSUM_KEY = "file.bytes-per-checksum";
public static final int BYTES_PER_CHECKSUM_DEFAULT = 512;
public static final String CLIENT_WRITE_PACKET_SIZE_KEY =
"file.client-write-packet-size";
public static final int CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
protected static FsServerDefaults getServerDefaults() throws IOException {
return new FsServerDefaults(
BLOCK_SIZE_DEFAULT,
BYTES_PER_CHECKSUM_DEFAULT,
CLIENT_WRITE_PACKET_SIZE_DEFAULT,
REPLICATION_DEFAULT,
STREAM_BUFFER_SIZE_DEFAULT);
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.local;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.ChecksumFs;
/**
* The LocalFs implementation of ChecksumFs.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
public class LocalFs extends ChecksumFs {
LocalFs(final Configuration conf) throws IOException, URISyntaxException {
super(new RawLocalFs(conf));
}
/**
* This constructor has the signature needed by
* {@link AbstractFileSystem#createFileSystem(URI, Configuration)}.
*
* @param theUri which must be that of localFs
* @param conf
* @throws IOException
* @throws URISyntaxException
*/
LocalFs(final URI theUri, final Configuration conf) throws IOException,
URISyntaxException {
this(conf);
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.local;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DelegateToFileSystem;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.RawLocalFileSystem;
/**
* The RawLocalFs implementation of AbstractFileSystem.
* This impl delegates to the old FileSystem
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
public class RawLocalFs extends DelegateToFileSystem {
RawLocalFs(final Configuration conf) throws IOException, URISyntaxException {
this(FsConstants.LOCAL_FS_URI, conf);
}
/**
* This constructor has the signature needed by
* {@link AbstractFileSystem#createFileSystem(URI, Configuration)}.
*
* @param theUri which must be that of localFs
* @param conf
* @throws IOException
* @throws URISyntaxException
*/
RawLocalFs(final URI theUri, final Configuration conf) throws IOException,
URISyntaxException {
super(theUri, new RawLocalFileSystem(), conf,
FsConstants.LOCAL_FS_URI.getScheme(), false);
}
@Override
protected int getUriDefaultPort() {
return -1; // No default port for file:///
}
@Override
protected FsServerDefaults getServerDefaults() throws IOException {
return LocalConfigKeys.getServerDefaults();
}
}

View File

@ -0,0 +1,215 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.StringTokenizer;
import junit.framework.Assert;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* <p>
* A collection of permission tests for the {@link FileContext}.
* This test should be used for testing an instance of FileContext
* that has been initialized to a specific default FileSystem such a
* LocalFileSystem, HDFS,S3, etc.
* </p>
* <p>
* To test a given {@link FileSystem} implementation create a subclass of this
* test and override {@link #setUp()} to initialize the <code>fc</code>
* {@link FileContext} instance variable.
*
* Since this a junit 4 you can also do a single setup before
* the start of any tests.
* E.g.
* @BeforeClass public static void clusterSetupAtBegining()
* @AfterClass public static void ClusterShutdownAtEnd()
* </p>
*/
public class FileContextPermissionBase {
static final String TEST_ROOT_DIR = new Path(System.getProperty(
"test.build.data", "/tmp")).toString().replace(' ', '_')
+ "/" + TestLocalFileSystemPermission.class.getSimpleName() + "_";
protected Path getTestRootRelativePath(String pathString) {
return fc.makeQualified(new Path(TEST_ROOT_DIR, pathString));
}
private Path rootPath = null;
protected Path getTestRootPath() {
if (rootPath == null) {
rootPath = fc.makeQualified(new Path(TEST_ROOT_DIR));
}
return rootPath;
}
{
try {
((org.apache.commons.logging.impl.Log4JLogger)FileSystem.LOG).getLogger()
.setLevel(org.apache.log4j.Level.DEBUG);
}
catch(Exception e) {
System.out.println("Cannot change log level\n"
+ StringUtils.stringifyException(e));
}
}
static FileContext fc;
@Before
public void setUp() throws Exception {
fc.mkdir(getTestRootPath(), FileContext.DEFAULT_PERM, true);
}
@After
public void tearDown() throws Exception {
fc.delete(getTestRootPath(), true);
}
private Path writeFile(FileContext theFc, String name) throws IOException {
Path f = getTestRootRelativePath(name);
FSDataOutputStream stm = theFc.create(f, EnumSet.of(CreateFlag.CREATE));
stm.writeBytes("42\n");
stm.close();
return f;
}
private void cleanupFile(FileContext theFc, Path name) throws IOException {
Assert.assertTrue(theFc.exists(name));
theFc.delete(name, true);
Assert.assertTrue(!theFc.exists(name));
}
@Test
public void testCreatePermission() throws IOException {
if (Path.WINDOWS) {
System.out.println("Cannot run test for Windows");
return;
}
String filename = "foo";
Path f = writeFile(fc, filename);
doFilePermissionCheck(FileContext.DEFAULT_PERM.applyUMask(fc.getUMask()),
fc.getFileStatus(f).getPermission());
}
@Test
public void testSetPermission() throws IOException {
if (Path.WINDOWS) {
System.out.println("Cannot run test for Windows");
return;
}
String filename = "foo";
Path f = writeFile(fc, filename);
try {
// create files and manipulate them.
FsPermission all = new FsPermission((short)0777);
FsPermission none = new FsPermission((short)0);
fc.setPermission(f, none);
doFilePermissionCheck(none, fc.getFileStatus(f).getPermission());
fc.setPermission(f, all);
doFilePermissionCheck(all, fc.getFileStatus(f).getPermission());
}
finally {cleanupFile(fc, f);}
}
@Test
public void testSetOwner() throws IOException {
if (Path.WINDOWS) {
System.out.println("Cannot run test for Windows");
return;
}
String filename = "bar";
Path f = writeFile(fc, filename);
List<String> groups = null;
try {
groups = getGroups();
System.out.println(filename + ": " + fc.getFileStatus(f).getPermission());
}
catch(IOException e) {
System.out.println(StringUtils.stringifyException(e));
System.out.println("Cannot run test");
return;
}
if (groups == null || groups.size() < 1) {
System.out.println("Cannot run test: need at least one group. groups="
+ groups);
return;
}
// create files and manipulate them.
try {
String g0 = groups.get(0);
fc.setOwner(f, null, g0);
Assert.assertEquals(g0, fc.getFileStatus(f).getGroup());
if (groups.size() > 1) {
String g1 = groups.get(1);
fc.setOwner(f, null, g1);
Assert.assertEquals(g1, fc.getFileStatus(f).getGroup());
} else {
System.out.println("Not testing changing the group since user " +
"belongs to only one group.");
}
}
finally {cleanupFile(fc, f);}
}
static List<String> getGroups() throws IOException {
List<String> a = new ArrayList<String>();
String s = Shell.execCommand(Shell.getGROUPS_COMMAND());
for(StringTokenizer t = new StringTokenizer(s); t.hasMoreTokens(); ) {
a.add(t.nextToken());
}
return a;
}
void doFilePermissionCheck(FsPermission expectedPerm, FsPermission actualPerm) {
Assert.assertEquals(expectedPerm.applyUMask(getFileMask()), actualPerm);
}
/*
* Some filesystem like HDFS ignore the "x" bit if the permission.
* Others like localFs does not.
* Override the method below if the file system being tested masks our
* certain bits for file masks.
*/
static final FsPermission FILE_MASK_ZERO = new FsPermission((short) 0);
FsPermission getFileMask() {
return FILE_MASK_ZERO;
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.junit.After;
import org.junit.Before;
/**
* Test permissions for localFs using FileContext API.
*/
public class TestFcLocalFsPermission extends
FileContextPermissionBase {
@Before
public void setUp() throws Exception {
fc = FileContext.getLocalFSFileContext();
super.setUp();
}
@After
public void tearDown() throws Exception {
super.tearDown();
}
}

View File

@ -70,8 +70,8 @@ public class TestFileContextDeleteOnExit {
}
private void checkDeleteOnExitData(int size, FileContext fc, Path... paths) {
Assert.assertEquals(size, FileContext.deleteOnExit.size());
Set<Path> set = FileContext.deleteOnExit.get(fc);
Assert.assertEquals(size, FileContext.DELETE_ON_EXIT.size());
Set<Path> set = FileContext.DELETE_ON_EXIT.get(fc);
Assert.assertEquals(paths.length, (set == null ? 0 : set.size()));
for (Path path : paths) {
Assert.assertTrue(set.contains(path));
@ -87,7 +87,7 @@ public class TestFileContextDeleteOnExit {
checkDeleteOnExitData(1, fc, file1);
// Ensure shutdown hook is added
Assert.assertTrue(Runtime.getRuntime().removeShutdownHook(FileContext.finalizer));
Assert.assertTrue(Runtime.getRuntime().removeShutdownHook(FileContext.FINALIZER));
Path file2 = getTestPath("dir1/file2");
createFile(fc, file2);
@ -101,8 +101,8 @@ public class TestFileContextDeleteOnExit {
// trigger deleteOnExit and ensure the registered
// paths are cleaned up
FileContext.finalizer.start();
FileContext.finalizer.join();
FileContext.FINALIZER.start();
FileContext.FINALIZER.join();
checkDeleteOnExitData(0, fc, new Path[0]);
Assert.assertFalse(fc.exists(file1));
Assert.assertFalse(fc.exists(file2));