removing dependency on NativeS3FileSystem and other file systems

This commit is contained in:
Himanshu Gupta 2015-02-22 11:17:58 -06:00
parent 7c02212584
commit 01a4f19ea2
1 changed files with 12 additions and 7 deletions

View File

@ -54,10 +54,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InvalidJobConfException;
@ -467,24 +464,32 @@ public class IndexGeneratorJob implements Jobby
Path finalIndexZipFilePath = new Path(indexBasePath, "index.zip");
final URI indexOutURI = finalIndexZipFilePath.toUri();
ImmutableMap<String, Object> loadSpec;
if (outputFS instanceof NativeS3FileSystem) {
// We do String comparison instead of instanceof checks here because in Hadoop 2.6.0
// NativeS3FileSystem got moved to a separate jar (hadoop-aws) that is not guaranteed
// to be part of the core code anymore. The instanceof check requires that the class exist
// but we do not have any guarantee that it will exist, so instead we must pull out
// the String name of it and verify that. We do a full package-qualified test in order
// to be as explicit as possible.
String fsClazz = outputFS.getClass().getName();
if ("org.apache.hadoop.fs.s3native.NativeS3FileSystem".equals(fsClazz)) {
loadSpec = ImmutableMap.<String, Object>of(
"type", "s3_zip",
"bucket", indexOutURI.getHost(),
"key", indexOutURI.getPath().substring(1) // remove the leading "/"
);
} else if (outputFS instanceof LocalFileSystem) {
} else if ("org.apache.hadoop.fs.LocalFileSystem".equals(fsClazz)) {
loadSpec = ImmutableMap.<String, Object>of(
"type", "local",
"path", indexOutURI.getPath()
);
} else if (outputFS instanceof DistributedFileSystem) {
} else if ("org.apache.hadoop.hdfs.DistributedFileSystem".equals(fsClazz)) {
loadSpec = ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", indexOutURI.toString()
);
} else {
throw new ISE("Unknown file system[%s]", outputFS.getClass());
throw new ISE("Unknown file system[%s]", fsClazz);
}
DataSegment segment = new DataSegment(