HADOOP-6307. Add a new SequenceFile.Reader constructor in order to support reading on un-closed file.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@886003 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2009-12-01 23:25:56 +00:00
parent ff6c0ef547
commit 98beeca09d
3 changed files with 76 additions and 22 deletions

View File

@ -684,6 +684,9 @@ Release 0.21.0 - Unreleased
HADOOP-6261. Add URI based tests for FileContext. HADOOP-6261. Add URI based tests for FileContext.
(Ravi Pulari via suresh). (Ravi Pulari via suresh).
HADOOP-6307. Add a new SequenceFile.Reader constructor in order to support
reading on un-closed file. (szetszwo)
BUG FIXES BUG FIXES
HADOOP-5379. CBZip2InputStream to throw IOException on data crc error. HADOOP-5379. CBZip2InputStream to throw IOException on data crc error.

View File

@ -1435,32 +1435,71 @@ public class SequenceFile {
private DeserializerBase keyDeserializer; private DeserializerBase keyDeserializer;
private DeserializerBase valDeserializer; private DeserializerBase valDeserializer;
/** Open the named file. */ /**
* Construct a reader by opening a file from the given file system.
* @param fs The file system used to open the file.
* @param file The file being read.
* @param conf Configuration
* @throws IOException
*/
public Reader(FileSystem fs, Path file, Configuration conf) public Reader(FileSystem fs, Path file, Configuration conf)
throws IOException { throws IOException {
this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, false); this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, false);
} }
private Reader(FileSystem fs, Path file, int bufferSize, /**
Configuration conf, boolean tempReader) throws IOException { * Construct a reader by the given input stream.
this(fs, file, bufferSize, 0, fs.getFileStatus(file).getLen(), conf, tempReader); * @param in An input stream.
* @param buffersize The buffer size used to read the file.
* @param start The starting position.
* @param length The length being read.
* @param conf Configuration
* @throws IOException
*/
public Reader(FSDataInputStream in, int buffersize,
long start, long length, Configuration conf) throws IOException {
this(null, null, in, buffersize, start, length, conf, false);
} }
private Reader(FileSystem fs, Path file, int bufferSize, long start, private Reader(FileSystem fs, Path file, int bufferSize,
long length, Configuration conf, boolean tempReader) Configuration conf, boolean tempReader) throws IOException {
throws IOException { this(fs, file, null, bufferSize, 0, fs.getFileStatus(file).getLen(),
conf, tempReader);
}
/**
* Private constructor.
* @param fs The file system used to open the file.
* It is not used if the given input stream is not null.
* @param file The file being read.
* @param in An input stream of the file. If it is null,
* the file will be opened from the given file system.
* @param bufferSize The buffer size used to read the file.
* @param start The starting position.
* @param length The length being read.
* @param conf Configuration
* @param tempReader Is this temporary?
* @throws IOException
*/
private Reader(FileSystem fs, Path file, FSDataInputStream in,
int bufferSize, long start, long length, Configuration conf,
boolean tempReader) throws IOException {
if (fs == null && in == null) {
throw new IllegalArgumentException("fs == null && in == null");
}
this.file = file; this.file = file;
this.in = openFile(fs, file, bufferSize, length); this.in = in != null? in: openFile(fs, file, bufferSize, length);
this.conf = conf; this.conf = conf;
boolean succeeded = false; boolean succeeded = false;
try { try {
seek(start); seek(start);
this.end = in.getPos() + length; this.end = this.in.getPos() + length;
init(tempReader); init(tempReader);
succeeded = true; succeeded = true;
} finally { } finally {
if (!succeeded) { if (!succeeded) {
IOUtils.cleanup(LOG, in); IOUtils.cleanup(LOG, this.in);
} }
} }
} }
@ -1468,6 +1507,13 @@ public class SequenceFile {
/** /**
* Override this method to specialize the type of * Override this method to specialize the type of
* {@link FSDataInputStream} returned. * {@link FSDataInputStream} returned.
* @param fs The file system used to open the file.
* @param file The file being read.
* @param bufferSize The buffer size used to read the file.
* @param length The length being read if it is >= 0. Otherwise,
* the length is not available.
* @return The opened stream.
* @throws IOException
*/ */
protected FSDataInputStream openFile(FileSystem fs, Path file, protected FSDataInputStream openFile(FileSystem fs, Path file,
int bufferSize, long length) throws IOException { int bufferSize, long length) throws IOException {
@ -1489,7 +1535,7 @@ public class SequenceFile {
if ((versionBlock[0] != VERSION[0]) || if ((versionBlock[0] != VERSION[0]) ||
(versionBlock[1] != VERSION[1]) || (versionBlock[1] != VERSION[1]) ||
(versionBlock[2] != VERSION[2])) (versionBlock[2] != VERSION[2]))
throw new IOException(file + " not a SequenceFile"); throw new IOException(this + " not a SequenceFile");
// Set 'version' // Set 'version'
version = versionBlock[3]; version = versionBlock[3];
@ -2251,7 +2297,7 @@ public class SequenceFile {
/** Returns the name of the file. */ /** Returns the name of the file. */
public String toString() { public String toString() {
return file.toString(); return file == null? "<unknown>": file.toString();
} }
} }
@ -3132,7 +3178,7 @@ public class SequenceFile {
if (fs.getUri().getScheme().startsWith("ramfs")) { if (fs.getUri().getScheme().startsWith("ramfs")) {
bufferSize = conf.getInt("io.bytes.per.checksum", 512); bufferSize = conf.getInt("io.bytes.per.checksum", 512);
} }
Reader reader = new Reader(fs, segmentPathName, Reader reader = new Reader(fs, segmentPathName, null,
bufferSize, segmentOffset, bufferSize, segmentOffset,
segmentLength, conf, false); segmentLength, conf, false);

View File

@ -18,22 +18,17 @@
package org.apache.hadoop.io; package org.apache.hadoop.io;
import java.io.File; import static org.junit.Assert.assertEquals;
import java.io.IOException; import java.io.IOException;
import java.util.Random; import java.util.Random;
import java.net.URI;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import static org.junit.Assert.*;
public class TestSequenceFileSync { public class TestSequenceFileSync {
private static final int NUMRECORDS = 2000; private static final int NUMRECORDS = 2000;
private static final int RECORDSIZE = 80; private static final int RECORDSIZE = 80;
@ -66,8 +61,18 @@ public class TestSequenceFileSync {
try { try {
writeSequenceFile(writer, NUMRECORDS); writeSequenceFile(writer, NUMRECORDS);
for (int i = 0; i < 5 ; i++) { for (int i = 0; i < 5 ; i++) {
final SequenceFile.Reader reader = final SequenceFile.Reader reader;
new SequenceFile.Reader(fs, path, conf);
//try different SequenceFile.Reader constructors
if (i % 2 == 0) {
reader = new SequenceFile.Reader(fs, path, conf);
} else {
final FSDataInputStream in = fs.open(path);
final long length = fs.getFileStatus(path).getLen();
final int buffersize = conf.getInt("io.file.buffer.size", 4096);
reader = new SequenceFile.Reader(in, buffersize, 0L, length, conf);
}
try { try {
forOffset(reader, input, val, i, 0, 0); forOffset(reader, input, val, i, 0, 0);
forOffset(reader, input, val, i, 65, 0); forOffset(reader, input, val, i, 65, 0);