HADOOP-10029. Specifying har file to MR job fails in secure cluster. Contributed by Suresh Srinivas.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1531125 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
22b332ff46
commit
4f0089621c
|
@ -273,7 +273,17 @@ public class HarFileSystem extends FileSystem {
|
||||||
public Path getWorkingDirectory() {
|
public Path getWorkingDirectory() {
|
||||||
return new Path(uri.toString());
|
return new Path(uri.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Path getInitialWorkingDirectory() {
|
||||||
|
return getWorkingDirectory();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FsStatus getStatus(Path p) throws IOException {
|
||||||
|
return fs.getStatus(p);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a har specific auth
|
* Create a har specific auth
|
||||||
* har-underlyingfs:port
|
* har-underlyingfs:port
|
||||||
|
@ -296,9 +306,18 @@ public class HarFileSystem extends FileSystem {
|
||||||
return auth;
|
return auth;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used for delegation token related functionality. Must delegate to
|
||||||
|
* underlying file system.
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected URI getCanonicalUri() {
|
protected URI getCanonicalUri() {
|
||||||
return fs.canonicalizeUri(getUri());
|
return fs.getCanonicalUri();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected URI canonicalizeUri(URI uri) {
|
||||||
|
return fs.canonicalizeUri(uri);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -311,6 +330,16 @@ public class HarFileSystem extends FileSystem {
|
||||||
return this.uri;
|
return this.uri;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void checkPath(Path path) {
|
||||||
|
fs.checkPath(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Path resolvePath(Path p) throws IOException {
|
||||||
|
return fs.resolvePath(p);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* this method returns the path
|
* this method returns the path
|
||||||
* inside the har filesystem.
|
* inside the har filesystem.
|
||||||
|
@ -675,18 +704,31 @@ public class HarFileSystem extends FileSystem {
|
||||||
hstatus.getPartName()),
|
hstatus.getPartName()),
|
||||||
hstatus.getStartIndex(), hstatus.getLength(), bufferSize);
|
hstatus.getStartIndex(), hstatus.getLength(), bufferSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used for delegation token related functionality. Must delegate to
|
||||||
|
* underlying file system.
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public FSDataOutputStream create(Path f,
|
public FileSystem[] getChildFileSystems() {
|
||||||
FsPermission permission,
|
return new FileSystem[]{fs};
|
||||||
boolean overwrite,
|
}
|
||||||
int bufferSize,
|
|
||||||
short replication,
|
@Override
|
||||||
long blockSize,
|
public FSDataOutputStream create(Path f, FsPermission permission,
|
||||||
|
boolean overwrite, int bufferSize, short replication, long blockSize,
|
||||||
Progressable progress) throws IOException {
|
Progressable progress) throws IOException {
|
||||||
throw new IOException("Har: create not allowed.");
|
throw new IOException("Har: create not allowed.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
@Override
|
||||||
|
public FSDataOutputStream createNonRecursive(Path f, boolean overwrite,
|
||||||
|
int bufferSize, short replication, long blockSize, Progressable progress)
|
||||||
|
throws IOException {
|
||||||
|
throw new IOException("Har: create not allowed.");
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
|
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
|
||||||
throw new IOException("Har: append not allowed.");
|
throw new IOException("Har: append not allowed.");
|
||||||
|
@ -694,6 +736,7 @@ public class HarFileSystem extends FileSystem {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
|
super.close();
|
||||||
if (fs != null) {
|
if (fs != null) {
|
||||||
try {
|
try {
|
||||||
fs.close();
|
fs.close();
|
||||||
|
@ -781,11 +824,17 @@ public class HarFileSystem extends FileSystem {
|
||||||
* not implemented.
|
* not implemented.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws
|
public void copyFromLocalFile(boolean delSrc, boolean overwrite,
|
||||||
IOException {
|
Path src, Path dst) throws IOException {
|
||||||
throw new IOException("Har: copyfromlocalfile not allowed");
|
throw new IOException("Har: copyfromlocalfile not allowed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void copyFromLocalFile(boolean delSrc, boolean overwrite,
|
||||||
|
Path[] srcs, Path dst) throws IOException {
|
||||||
|
throw new IOException("Har: copyfromlocalfile not allowed");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* copies the file in the har filesystem to a local file.
|
* copies the file in the har filesystem to a local file.
|
||||||
*/
|
*/
|
||||||
|
@ -822,6 +871,11 @@ public class HarFileSystem extends FileSystem {
|
||||||
throw new IOException("Har: setowner not allowed");
|
throw new IOException("Har: setowner not allowed");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setTimes(Path p, long mtime, long atime) throws IOException {
|
||||||
|
throw new IOException("Har: setTimes not allowed");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Not implemented.
|
* Not implemented.
|
||||||
*/
|
*/
|
||||||
|
@ -1147,4 +1201,43 @@ public class HarFileSystem extends FileSystem {
|
||||||
return size() > MAX_ENTRIES;
|
return size() > MAX_ENTRIES;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
@Override
|
||||||
|
public FsServerDefaults getServerDefaults() throws IOException {
|
||||||
|
return fs.getServerDefaults();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FsServerDefaults getServerDefaults(Path f) throws IOException {
|
||||||
|
return fs.getServerDefaults(f);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getUsed() throws IOException{
|
||||||
|
return fs.getUsed();
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
@Override
|
||||||
|
public long getDefaultBlockSize() {
|
||||||
|
return fs.getDefaultBlockSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
@Override
|
||||||
|
public long getDefaultBlockSize(Path f) {
|
||||||
|
return fs.getDefaultBlockSize(f);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
@Override
|
||||||
|
public short getDefaultReplication() {
|
||||||
|
return fs.getDefaultReplication();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public short getDefaultReplication(Path f) {
|
||||||
|
return fs.getDefaultReplication(f);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,14 +18,155 @@
|
||||||
|
|
||||||
package org.apache.hadoop.fs;
|
package org.apache.hadoop.fs;
|
||||||
|
|
||||||
import java.io.IOException;
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import static org.junit.Assert.*;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.lang.reflect.Modifier;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||||
|
import static org.apache.hadoop.fs.Options.CreateOpts;
|
||||||
|
import static org.apache.hadoop.fs.Options.Rename;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
public class TestHarFileSystem {
|
public class TestHarFileSystem {
|
||||||
|
public static final Log LOG = LogFactory.getLog(TestHarFileSystem.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* FileSystem methods that must not be overwritten by
|
||||||
|
* {@link HarFileSystem}. Either because there is a default implementation
|
||||||
|
* already available or because it is not relevant.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
private interface MustNotImplement {
|
||||||
|
public BlockLocation[] getFileBlockLocations(Path p, long start, long len);
|
||||||
|
public long getLength(Path f);
|
||||||
|
public FSDataOutputStream append(Path f, int bufferSize);
|
||||||
|
public void rename(Path src, Path dst, Rename... options);
|
||||||
|
public boolean exists(Path f);
|
||||||
|
public boolean isDirectory(Path f);
|
||||||
|
public boolean isFile(Path f);
|
||||||
|
public boolean createNewFile(Path f);
|
||||||
|
|
||||||
|
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
|
||||||
|
boolean overwrite, int bufferSize, short replication, long blockSize,
|
||||||
|
Progressable progress) throws IOException;
|
||||||
|
|
||||||
|
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
|
||||||
|
EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
|
||||||
|
Progressable progress) throws IOException;
|
||||||
|
|
||||||
|
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
|
||||||
|
EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
|
||||||
|
Progressable progress, ChecksumOpt checksumOpt);
|
||||||
|
|
||||||
|
public boolean mkdirs(Path f);
|
||||||
|
public FSDataInputStream open(Path f);
|
||||||
|
public FSDataOutputStream create(Path f);
|
||||||
|
public FSDataOutputStream create(Path f, boolean overwrite);
|
||||||
|
public FSDataOutputStream create(Path f, Progressable progress);
|
||||||
|
public FSDataOutputStream create(Path f, short replication);
|
||||||
|
public FSDataOutputStream create(Path f, short replication,
|
||||||
|
Progressable progress);
|
||||||
|
|
||||||
|
public FSDataOutputStream create(Path f, boolean overwrite,
|
||||||
|
int bufferSize);
|
||||||
|
|
||||||
|
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
|
||||||
|
Progressable progress);
|
||||||
|
|
||||||
|
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
|
||||||
|
short replication, long blockSize);
|
||||||
|
|
||||||
|
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
|
||||||
|
short replication, long blockSize, Progressable progress);
|
||||||
|
|
||||||
|
public FSDataOutputStream create(Path f, FsPermission permission,
|
||||||
|
EnumSet<CreateFlag> flags, int bufferSize, short replication,
|
||||||
|
long blockSize, Progressable progress) throws IOException;
|
||||||
|
|
||||||
|
public FSDataOutputStream create(Path f, FsPermission permission,
|
||||||
|
EnumSet<CreateFlag> flags, int bufferSize, short replication,
|
||||||
|
long blockSize, Progressable progress, ChecksumOpt checksumOpt)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
public String getName();
|
||||||
|
public boolean delete(Path f);
|
||||||
|
public short getReplication(Path src);
|
||||||
|
public void processDeleteOnExit();
|
||||||
|
public ContentSummary getContentSummary(Path f);
|
||||||
|
public FsStatus getStatus();
|
||||||
|
public FileStatus[] listStatus(Path f, PathFilter filter);
|
||||||
|
public FileStatus[] listStatus(Path[] files);
|
||||||
|
public FileStatus[] listStatus(Path[] files, PathFilter filter);
|
||||||
|
public FileStatus[] globStatus(Path pathPattern);
|
||||||
|
public FileStatus[] globStatus(Path pathPattern, PathFilter filter);
|
||||||
|
|
||||||
|
public Iterator<LocatedFileStatus> listFiles(Path path,
|
||||||
|
boolean isRecursive);
|
||||||
|
|
||||||
|
public Iterator<LocatedFileStatus> listLocatedStatus(Path f);
|
||||||
|
public Iterator<LocatedFileStatus> listLocatedStatus(Path f,
|
||||||
|
PathFilter filter);
|
||||||
|
public void copyFromLocalFile(Path src, Path dst);
|
||||||
|
public void moveFromLocalFile(Path[] srcs, Path dst);
|
||||||
|
public void moveFromLocalFile(Path src, Path dst);
|
||||||
|
public void copyToLocalFile(Path src, Path dst);
|
||||||
|
public void copyToLocalFile(boolean delSrc, Path src, Path dst,
|
||||||
|
boolean useRawLocalFileSystem);
|
||||||
|
public void moveToLocalFile(Path src, Path dst);
|
||||||
|
public long getBlockSize(Path f);
|
||||||
|
public FSDataOutputStream primitiveCreate(Path f,
|
||||||
|
EnumSet<CreateFlag> createFlag, CreateOpts... opts);
|
||||||
|
public void primitiveMkdir(Path f, FsPermission absolutePermission,
|
||||||
|
boolean createParent);
|
||||||
|
public int getDefaultPort();
|
||||||
|
public String getCanonicalServiceName();
|
||||||
|
public Token<?> getDelegationToken(String renewer) throws IOException;
|
||||||
|
public boolean deleteOnExit(Path f) throws IOException;
|
||||||
|
public boolean cancelDeleteOnExit(Path f) throws IOException;
|
||||||
|
public Token<?>[] addDelegationTokens(String renewer, Credentials creds)
|
||||||
|
throws IOException;
|
||||||
|
public Path fixRelativePart(Path p);
|
||||||
|
public void concat(Path trg, Path [] psrcs) throws IOException;
|
||||||
|
public FSDataOutputStream primitiveCreate(Path f,
|
||||||
|
FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
|
||||||
|
short replication, long blockSize, Progressable progress,
|
||||||
|
ChecksumOpt checksumOpt) throws IOException;
|
||||||
|
public boolean primitiveMkdir(Path f, FsPermission absolutePermission)
|
||||||
|
throws IOException;
|
||||||
|
public RemoteIterator<Path> listCorruptFileBlocks(Path path)
|
||||||
|
throws IOException;
|
||||||
|
public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
|
||||||
|
throws IOException;
|
||||||
|
public void createSymlink(Path target, Path link, boolean createParent)
|
||||||
|
throws IOException;
|
||||||
|
public FileStatus getFileLinkStatus(Path f) throws IOException;
|
||||||
|
public boolean supportsSymlinks();
|
||||||
|
public Path getLinkTarget(Path f) throws IOException;
|
||||||
|
public Path resolveLink(Path f) throws IOException;
|
||||||
|
public void setVerifyChecksum(boolean verifyChecksum);
|
||||||
|
public void setWriteChecksum(boolean writeChecksum);
|
||||||
|
public Path createSnapshot(Path path, String snapshotName) throws
|
||||||
|
IOException;
|
||||||
|
public void renameSnapshot(Path path, String snapshotOldName,
|
||||||
|
String snapshotNewName) throws IOException;
|
||||||
|
public void deleteSnapshot(Path path, String snapshotName)
|
||||||
|
throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHarUri() {
|
public void testHarUri() {
|
||||||
final Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
|
@ -44,8 +185,7 @@ public class TestHarFileSystem {
|
||||||
p.getFileSystem(conf);
|
p.getFileSystem(conf);
|
||||||
Assert.fail(p + " is an invalid path.");
|
Assert.fail(p + " is an invalid path.");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
System.out.println("GOOD: Got an exception.");
|
// Expected
|
||||||
e.printStackTrace(System.out);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -133,6 +273,37 @@ public class TestHarFileSystem {
|
||||||
assertEquals(b[1].getOffset(), 128);
|
assertEquals(b[1].getOffset(), 128);
|
||||||
assertEquals(b[1].getLength(), 384);
|
assertEquals(b[1].getLength(), 384);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInheritedMethodsImplemented() throws Exception {
|
||||||
|
int errors = 0;
|
||||||
|
for (Method m : FileSystem.class.getDeclaredMethods()) {
|
||||||
|
if (Modifier.isStatic(m.getModifiers()) ||
|
||||||
|
Modifier.isPrivate(m.getModifiers()) ||
|
||||||
|
Modifier.isFinal(m.getModifiers())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
MustNotImplement.class.getMethod(m.getName(), m.getParameterTypes());
|
||||||
|
try {
|
||||||
|
HarFileSystem.class.getDeclaredMethod(m.getName(), m.getParameterTypes());
|
||||||
|
LOG.error("HarFileSystem MUST not implement " + m);
|
||||||
|
errors++;
|
||||||
|
} catch (NoSuchMethodException ex) {
|
||||||
|
// Expected
|
||||||
|
}
|
||||||
|
} catch (NoSuchMethodException exc) {
|
||||||
|
try {
|
||||||
|
HarFileSystem.class.getDeclaredMethod(m.getName(), m.getParameterTypes());
|
||||||
|
} catch (NoSuchMethodException exc2) {
|
||||||
|
LOG.error("HarFileSystem MUST implement " + m);
|
||||||
|
errors++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue((errors + " methods were not overridden correctly - see log"),
|
||||||
|
errors <= 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue