HADOOP-7087. SequenceFile.createWriter ignores FileSystem parameter. Contributed by Todd Lipcon
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1057789 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a42c891055
commit
496b24d30b
|
@ -425,6 +425,8 @@ Release 0.22.0 - Unreleased
|
||||||
HADOOP-7070. JAAS configuration should delegate unknown application names
|
HADOOP-7070. JAAS configuration should delegate unknown application names
|
||||||
to pre-existing configuration. (todd)
|
to pre-existing configuration. (todd)
|
||||||
|
|
||||||
|
HADOOP-7087. SequenceFile.createWriter ignores FileSystem parameter (todd)
|
||||||
|
|
||||||
Release 0.21.1 - Unreleased
|
Release 0.21.1 - Unreleased
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
|
@ -288,7 +288,8 @@ public class SequenceFile {
|
||||||
public static Writer
|
public static Writer
|
||||||
createWriter(FileSystem fs, Configuration conf, Path name,
|
createWriter(FileSystem fs, Configuration conf, Path name,
|
||||||
Class keyClass, Class valClass) throws IOException {
|
Class keyClass, Class valClass) throws IOException {
|
||||||
return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass),
|
return createWriter(conf, Writer.filesystem(fs),
|
||||||
|
Writer.file(name), Writer.keyClass(keyClass),
|
||||||
Writer.valueClass(valClass));
|
Writer.valueClass(valClass));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -310,7 +311,8 @@ public class SequenceFile {
|
||||||
createWriter(FileSystem fs, Configuration conf, Path name,
|
createWriter(FileSystem fs, Configuration conf, Path name,
|
||||||
Class keyClass, Class valClass,
|
Class keyClass, Class valClass,
|
||||||
CompressionType compressionType) throws IOException {
|
CompressionType compressionType) throws IOException {
|
||||||
return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass),
|
return createWriter(conf, Writer.filesystem(fs),
|
||||||
|
Writer.file(name), Writer.keyClass(keyClass),
|
||||||
Writer.valueClass(valClass),
|
Writer.valueClass(valClass),
|
||||||
Writer.compression(compressionType));
|
Writer.compression(compressionType));
|
||||||
}
|
}
|
||||||
|
@ -334,7 +336,9 @@ public class SequenceFile {
|
||||||
createWriter(FileSystem fs, Configuration conf, Path name,
|
createWriter(FileSystem fs, Configuration conf, Path name,
|
||||||
Class keyClass, Class valClass, CompressionType compressionType,
|
Class keyClass, Class valClass, CompressionType compressionType,
|
||||||
Progressable progress) throws IOException {
|
Progressable progress) throws IOException {
|
||||||
return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass),
|
return createWriter(conf, Writer.file(name),
|
||||||
|
Writer.filesystem(fs),
|
||||||
|
Writer.keyClass(keyClass),
|
||||||
Writer.valueClass(valClass),
|
Writer.valueClass(valClass),
|
||||||
Writer.compression(compressionType),
|
Writer.compression(compressionType),
|
||||||
Writer.progressable(progress));
|
Writer.progressable(progress));
|
||||||
|
@ -359,7 +363,9 @@ public class SequenceFile {
|
||||||
createWriter(FileSystem fs, Configuration conf, Path name,
|
createWriter(FileSystem fs, Configuration conf, Path name,
|
||||||
Class keyClass, Class valClass, CompressionType compressionType,
|
Class keyClass, Class valClass, CompressionType compressionType,
|
||||||
CompressionCodec codec) throws IOException {
|
CompressionCodec codec) throws IOException {
|
||||||
return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass),
|
return createWriter(conf, Writer.file(name),
|
||||||
|
Writer.filesystem(fs),
|
||||||
|
Writer.keyClass(keyClass),
|
||||||
Writer.valueClass(valClass),
|
Writer.valueClass(valClass),
|
||||||
Writer.compression(compressionType, codec));
|
Writer.compression(compressionType, codec));
|
||||||
}
|
}
|
||||||
|
@ -386,7 +392,9 @@ public class SequenceFile {
|
||||||
Class keyClass, Class valClass,
|
Class keyClass, Class valClass,
|
||||||
CompressionType compressionType, CompressionCodec codec,
|
CompressionType compressionType, CompressionCodec codec,
|
||||||
Progressable progress, Metadata metadata) throws IOException {
|
Progressable progress, Metadata metadata) throws IOException {
|
||||||
return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass),
|
return createWriter(conf, Writer.file(name),
|
||||||
|
Writer.filesystem(fs),
|
||||||
|
Writer.keyClass(keyClass),
|
||||||
Writer.valueClass(valClass),
|
Writer.valueClass(valClass),
|
||||||
Writer.compression(compressionType, codec),
|
Writer.compression(compressionType, codec),
|
||||||
Writer.progressable(progress),
|
Writer.progressable(progress),
|
||||||
|
@ -419,7 +427,9 @@ public class SequenceFile {
|
||||||
short replication, long blockSize,
|
short replication, long blockSize,
|
||||||
CompressionType compressionType, CompressionCodec codec,
|
CompressionType compressionType, CompressionCodec codec,
|
||||||
Progressable progress, Metadata metadata) throws IOException {
|
Progressable progress, Metadata metadata) throws IOException {
|
||||||
return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass),
|
return createWriter(conf, Writer.file(name),
|
||||||
|
Writer.filesystem(fs),
|
||||||
|
Writer.keyClass(keyClass),
|
||||||
Writer.valueClass(valClass),
|
Writer.valueClass(valClass),
|
||||||
Writer.bufferSize(bufferSize),
|
Writer.bufferSize(bufferSize),
|
||||||
Writer.replication(replication),
|
Writer.replication(replication),
|
||||||
|
@ -450,7 +460,9 @@ public class SequenceFile {
|
||||||
Class keyClass, Class valClass,
|
Class keyClass, Class valClass,
|
||||||
CompressionType compressionType, CompressionCodec codec,
|
CompressionType compressionType, CompressionCodec codec,
|
||||||
Progressable progress) throws IOException {
|
Progressable progress) throws IOException {
|
||||||
return createWriter(conf, Writer.file(name), Writer.keyClass(keyClass),
|
return createWriter(conf, Writer.file(name),
|
||||||
|
Writer.filesystem(fs),
|
||||||
|
Writer.keyClass(keyClass),
|
||||||
Writer.valueClass(valClass),
|
Writer.valueClass(valClass),
|
||||||
Writer.compression(compressionType, codec),
|
Writer.compression(compressionType, codec),
|
||||||
Writer.progressable(progress));
|
Writer.progressable(progress));
|
||||||
|
@ -777,6 +789,21 @@ public class SequenceFile {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated only used for backwards-compatibility in the createWriter methods
|
||||||
|
* that take FileSystem.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
private static class FileSystemOption implements Option {
|
||||||
|
private final FileSystem value;
|
||||||
|
protected FileSystemOption(FileSystem value) {
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
public FileSystem getValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static class StreamOption extends Options.FSDataOutputStreamOption
|
static class StreamOption extends Options.FSDataOutputStreamOption
|
||||||
implements Option {
|
implements Option {
|
||||||
StreamOption(FSDataOutputStream stream) {
|
StreamOption(FSDataOutputStream stream) {
|
||||||
|
@ -857,6 +884,15 @@ public class SequenceFile {
|
||||||
public static Option file(Path value) {
|
public static Option file(Path value) {
|
||||||
return new FileOption(value);
|
return new FileOption(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated only used for backwards-compatibility in the createWriter methods
|
||||||
|
* that take FileSystem.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
private static Option filesystem(FileSystem fs) {
|
||||||
|
return new SequenceFile.Writer.FileSystemOption(fs);
|
||||||
|
}
|
||||||
|
|
||||||
public static Option bufferSize(int value) {
|
public static Option bufferSize(int value) {
|
||||||
return new BufferSizeOption(value);
|
return new BufferSizeOption(value);
|
||||||
|
@ -916,6 +952,7 @@ public class SequenceFile {
|
||||||
ProgressableOption progressOption =
|
ProgressableOption progressOption =
|
||||||
Options.getOption(ProgressableOption.class, opts);
|
Options.getOption(ProgressableOption.class, opts);
|
||||||
FileOption fileOption = Options.getOption(FileOption.class, opts);
|
FileOption fileOption = Options.getOption(FileOption.class, opts);
|
||||||
|
FileSystemOption fsOption = Options.getOption(FileSystemOption.class, opts);
|
||||||
StreamOption streamOption = Options.getOption(StreamOption.class, opts);
|
StreamOption streamOption = Options.getOption(StreamOption.class, opts);
|
||||||
KeyClassOption keyClassOption =
|
KeyClassOption keyClassOption =
|
||||||
Options.getOption(KeyClassOption.class, opts);
|
Options.getOption(KeyClassOption.class, opts);
|
||||||
|
@ -941,7 +978,12 @@ public class SequenceFile {
|
||||||
boolean ownStream = fileOption != null;
|
boolean ownStream = fileOption != null;
|
||||||
if (ownStream) {
|
if (ownStream) {
|
||||||
Path p = fileOption.getValue();
|
Path p = fileOption.getValue();
|
||||||
FileSystem fs = p.getFileSystem(conf);
|
FileSystem fs;
|
||||||
|
if (fsOption != null) {
|
||||||
|
fs = fsOption.getValue();
|
||||||
|
} else {
|
||||||
|
fs = p.getFileSystem(conf);
|
||||||
|
}
|
||||||
int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
|
int bufferSize = bufferSizeOption == null ? getBufferSize(conf) :
|
||||||
bufferSizeOption.getValue();
|
bufferSizeOption.getValue();
|
||||||
short replication = replicationOption == null ?
|
short replication = replicationOption == null ?
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
import org.apache.hadoop.io.compress.DefaultCodec;
|
import org.apache.hadoop.io.compress.DefaultCodec;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.conf.*;
|
import org.apache.hadoop.conf.*;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
|
||||||
/** Support for flat files of binary key/value pairs. */
|
/** Support for flat files of binary key/value pairs. */
|
||||||
|
@ -457,6 +458,20 @@ public class TestSequenceFile extends TestCase {
|
||||||
assertFalse(reader2.next(text));
|
assertFalse(reader2.next(text));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that makes sure the FileSystem passed to createWriter
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public void testCreateUsesFsArg() throws Exception {
|
||||||
|
FileSystem fs = FileSystem.getLocal(conf);
|
||||||
|
FileSystem spyFs = Mockito.spy(fs);
|
||||||
|
Path p = new Path(System.getProperty("test.build.data", ".")+"/testCreateUsesFSArg.seq");
|
||||||
|
SequenceFile.Writer writer = SequenceFile.createWriter(
|
||||||
|
spyFs, conf, p, NullWritable.class, NullWritable.class);
|
||||||
|
writer.close();
|
||||||
|
Mockito.verify(spyFs).getDefaultReplication();
|
||||||
|
}
|
||||||
|
|
||||||
private static class TestFSDataInputStream extends FSDataInputStream {
|
private static class TestFSDataInputStream extends FSDataInputStream {
|
||||||
private boolean closed = false;
|
private boolean closed = false;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue