MAPREDUCE-1176. FixedLengthInputFormat and FixedLengthRecordReader (Mariappan Asokan and BitsOfInfo via Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1540931 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
455269cf9f
commit
eeda370249
|
@ -164,6 +164,9 @@ Release 2.3.0 - UNRELEASED
|
||||||
|
|
||||||
MAPREDUCE-4421. Run MapReduce framework via the distributed cache (jlowe)
|
MAPREDUCE-4421. Run MapReduce framework via the distributed cache (jlowe)
|
||||||
|
|
||||||
|
MAPREDUCE-1176. FixedLengthInputFormat and FixedLengthRecordReader
|
||||||
|
(Mariappan Asokan and BitsOfInfo via Sandy Ryza)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)
|
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)
|
||||||
|
|
|
@ -0,0 +1,97 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.BytesWritable;
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
|
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* FixedLengthInputFormat is an input format used to read input files
|
||||||
|
* which contain fixed length records. The content of a record need not be
|
||||||
|
* text. It can be arbitrary binary data. Users must configure the record
|
||||||
|
* length property by calling:
|
||||||
|
* FixedLengthInputFormat.setRecordLength(conf, recordLength);<br><br> or
|
||||||
|
* conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, recordLength);
|
||||||
|
* <br><br>
|
||||||
|
* @see FixedLengthRecordReader
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Stable
|
||||||
|
public class FixedLengthInputFormat
|
||||||
|
extends FileInputFormat<LongWritable, BytesWritable>
|
||||||
|
implements JobConfigurable {
|
||||||
|
|
||||||
|
private CompressionCodecFactory compressionCodecs = null;
|
||||||
|
|
||||||
|
public static final String FIXED_RECORD_LENGTH =
|
||||||
|
"fixedlengthinputformat.record.length";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the length of each record
|
||||||
|
* @param conf configuration
|
||||||
|
* @param recordLength the length of a record
|
||||||
|
*/
|
||||||
|
public static void setRecordLength(Configuration conf, int recordLength) {
|
||||||
|
conf.setInt(FIXED_RECORD_LENGTH, recordLength);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get record length value
|
||||||
|
* @param conf configuration
|
||||||
|
* @return the record length, zero means none was set
|
||||||
|
*/
|
||||||
|
public static int getRecordLength(Configuration conf) {
|
||||||
|
return conf.getInt(FIXED_RECORD_LENGTH, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(JobConf conf) {
|
||||||
|
compressionCodecs = new CompressionCodecFactory(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RecordReader<LongWritable, BytesWritable>
|
||||||
|
getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter)
|
||||||
|
throws IOException {
|
||||||
|
reporter.setStatus(genericSplit.toString());
|
||||||
|
int recordLength = getRecordLength(job);
|
||||||
|
if (recordLength <= 0) {
|
||||||
|
throw new IOException("Fixed record length " + recordLength
|
||||||
|
+ " is invalid. It should be set to a value greater than zero");
|
||||||
|
}
|
||||||
|
return new FixedLengthRecordReader(job, (FileSplit)genericSplit,
|
||||||
|
recordLength);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean isSplitable(FileSystem fs, Path file) {
|
||||||
|
final CompressionCodec codec = compressionCodecs.getCodec(file);
|
||||||
|
return(null == codec);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,89 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.BytesWritable;
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A reader to read fixed length records from a split. Record offset is
|
||||||
|
* returned as key and the record as bytes is returned in value.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class FixedLengthRecordReader
|
||||||
|
implements RecordReader<LongWritable, BytesWritable> {
|
||||||
|
|
||||||
|
private int recordLength;
|
||||||
|
// Make use of the new API implementation to avoid code duplication.
|
||||||
|
private org.apache.hadoop.mapreduce.lib.input.FixedLengthRecordReader reader;
|
||||||
|
|
||||||
|
public FixedLengthRecordReader(Configuration job, FileSplit split,
|
||||||
|
int recordLength) throws IOException {
|
||||||
|
this.recordLength = recordLength;
|
||||||
|
reader = new org.apache.hadoop.mapreduce.lib.input.FixedLengthRecordReader(
|
||||||
|
recordLength);
|
||||||
|
reader.initialize(job, split.getStart(), split.getLength(),
|
||||||
|
split.getPath());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongWritable createKey() {
|
||||||
|
return new LongWritable();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BytesWritable createValue() {
|
||||||
|
return new BytesWritable(new byte[recordLength]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized boolean next(LongWritable key, BytesWritable value)
|
||||||
|
throws IOException {
|
||||||
|
boolean dataRead = reader.nextKeyValue();
|
||||||
|
if (dataRead) {
|
||||||
|
LongWritable newKey = reader.getCurrentKey();
|
||||||
|
BytesWritable newValue = reader.getCurrentValue();
|
||||||
|
key.set(newKey.get());
|
||||||
|
value.set(newValue);
|
||||||
|
}
|
||||||
|
return dataRead;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public float getProgress() throws IOException {
|
||||||
|
return reader.getProgress();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized long getPos() throws IOException {
|
||||||
|
return reader.getPos();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
reader.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,90 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapreduce.lib.input;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.BytesWritable;
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
|
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
||||||
|
import org.apache.hadoop.mapreduce.InputSplit;
|
||||||
|
import org.apache.hadoop.mapreduce.JobContext;
|
||||||
|
import org.apache.hadoop.mapreduce.RecordReader;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* FixedLengthInputFormat is an input format used to read input files
|
||||||
|
* which contain fixed length records. The content of a record need not be
|
||||||
|
* text. It can be arbitrary binary data. Users must configure the record
|
||||||
|
* length property by calling:
|
||||||
|
* FixedLengthInputFormat.setRecordLength(conf, recordLength);<br><br> or
|
||||||
|
* conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, recordLength);
|
||||||
|
* <br><br>
|
||||||
|
* @see FixedLengthRecordReader
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Stable
|
||||||
|
public class FixedLengthInputFormat
|
||||||
|
extends FileInputFormat<LongWritable, BytesWritable> {
|
||||||
|
|
||||||
|
public static final String FIXED_RECORD_LENGTH =
|
||||||
|
"fixedlengthinputformat.record.length";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the length of each record
|
||||||
|
* @param conf configuration
|
||||||
|
* @param recordLength the length of a record
|
||||||
|
*/
|
||||||
|
public static void setRecordLength(Configuration conf, int recordLength) {
|
||||||
|
conf.setInt(FIXED_RECORD_LENGTH, recordLength);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get record length value
|
||||||
|
* @param conf configuration
|
||||||
|
* @return the record length, zero means none was set
|
||||||
|
*/
|
||||||
|
public static int getRecordLength(Configuration conf) {
|
||||||
|
return conf.getInt(FIXED_RECORD_LENGTH, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RecordReader<LongWritable, BytesWritable>
|
||||||
|
createRecordReader(InputSplit split, TaskAttemptContext context)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
int recordLength = getRecordLength(context.getConfiguration());
|
||||||
|
if (recordLength <= 0) {
|
||||||
|
throw new IOException("Fixed record length " + recordLength
|
||||||
|
+ " is invalid. It should be set to a value greater than zero");
|
||||||
|
}
|
||||||
|
return new FixedLengthRecordReader(recordLength);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean isSplitable(JobContext context, Path file) {
|
||||||
|
final CompressionCodec codec =
|
||||||
|
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
|
||||||
|
return (null == codec);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,220 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.lib.input;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.Seekable;
|
||||||
|
import org.apache.hadoop.io.BytesWritable;
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
import org.apache.hadoop.io.compress.CodecPool;
|
||||||
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
|
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
||||||
|
import org.apache.hadoop.io.compress.CompressionInputStream;
|
||||||
|
import org.apache.hadoop.io.compress.Decompressor;
|
||||||
|
import org.apache.hadoop.mapreduce.InputSplit;
|
||||||
|
import org.apache.hadoop.mapreduce.RecordReader;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A reader to read fixed length records from a split. Record offset is
|
||||||
|
* returned as key and the record as bytes is returned in value.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class FixedLengthRecordReader
|
||||||
|
extends RecordReader<LongWritable, BytesWritable> {
|
||||||
|
private static final Log LOG
|
||||||
|
= LogFactory.getLog(FixedLengthRecordReader.class);
|
||||||
|
|
||||||
|
private int recordLength;
|
||||||
|
private long start;
|
||||||
|
private long pos;
|
||||||
|
private long end;
|
||||||
|
private long numRecordsRemainingInSplit;
|
||||||
|
private FSDataInputStream fileIn;
|
||||||
|
private Seekable filePosition;
|
||||||
|
private LongWritable key;
|
||||||
|
private BytesWritable value;
|
||||||
|
private boolean isCompressedInput;
|
||||||
|
private Decompressor decompressor;
|
||||||
|
private InputStream inputStream;
|
||||||
|
|
||||||
|
public FixedLengthRecordReader(int recordLength) {
|
||||||
|
this.recordLength = recordLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initialize(InputSplit genericSplit,
|
||||||
|
TaskAttemptContext context) throws IOException {
|
||||||
|
FileSplit split = (FileSplit) genericSplit;
|
||||||
|
Configuration job = context.getConfiguration();
|
||||||
|
final Path file = split.getPath();
|
||||||
|
initialize(job, split.getStart(), split.getLength(), file);
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is also called from the old FixedLengthRecordReader API implementation
|
||||||
|
public void initialize(Configuration job, long splitStart, long splitLength,
|
||||||
|
Path file) throws IOException {
|
||||||
|
start = splitStart;
|
||||||
|
end = start + splitLength;
|
||||||
|
long partialRecordLength = start % recordLength;
|
||||||
|
long numBytesToSkip = 0;
|
||||||
|
if (partialRecordLength != 0) {
|
||||||
|
numBytesToSkip = recordLength - partialRecordLength;
|
||||||
|
}
|
||||||
|
|
||||||
|
// open the file and seek to the start of the split
|
||||||
|
final FileSystem fs = file.getFileSystem(job);
|
||||||
|
fileIn = fs.open(file);
|
||||||
|
|
||||||
|
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
|
||||||
|
if (null != codec) {
|
||||||
|
isCompressedInput = true;
|
||||||
|
decompressor = CodecPool.getDecompressor(codec);
|
||||||
|
CompressionInputStream cIn
|
||||||
|
= codec.createInputStream(fileIn, decompressor);
|
||||||
|
filePosition = cIn;
|
||||||
|
inputStream = cIn;
|
||||||
|
numRecordsRemainingInSplit = Long.MAX_VALUE;
|
||||||
|
LOG.info(
|
||||||
|
"Compressed input; cannot compute number of records in the split");
|
||||||
|
} else {
|
||||||
|
fileIn.seek(start);
|
||||||
|
filePosition = fileIn;
|
||||||
|
inputStream = fileIn;
|
||||||
|
long splitSize = end - start - numBytesToSkip;
|
||||||
|
numRecordsRemainingInSplit = (splitSize + recordLength - 1)/recordLength;
|
||||||
|
if (numRecordsRemainingInSplit < 0) {
|
||||||
|
numRecordsRemainingInSplit = 0;
|
||||||
|
}
|
||||||
|
LOG.info("Expecting " + numRecordsRemainingInSplit
|
||||||
|
+ " records each with a length of " + recordLength
|
||||||
|
+ " bytes in the split with an effective size of "
|
||||||
|
+ splitSize + " bytes");
|
||||||
|
}
|
||||||
|
if (numBytesToSkip != 0) {
|
||||||
|
start += inputStream.skip(numBytesToSkip);
|
||||||
|
}
|
||||||
|
this.pos = start;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized boolean nextKeyValue() throws IOException {
|
||||||
|
if (key == null) {
|
||||||
|
key = new LongWritable();
|
||||||
|
}
|
||||||
|
if (value == null) {
|
||||||
|
value = new BytesWritable(new byte[recordLength]);
|
||||||
|
}
|
||||||
|
boolean dataRead = false;
|
||||||
|
value.setSize(recordLength);
|
||||||
|
byte[] record = value.getBytes();
|
||||||
|
if (numRecordsRemainingInSplit > 0) {
|
||||||
|
key.set(pos);
|
||||||
|
int offset = 0;
|
||||||
|
int numBytesToRead = recordLength;
|
||||||
|
int numBytesRead = 0;
|
||||||
|
while (numBytesToRead > 0) {
|
||||||
|
numBytesRead = inputStream.read(record, offset, numBytesToRead);
|
||||||
|
if (numBytesRead == -1) {
|
||||||
|
// EOF
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
offset += numBytesRead;
|
||||||
|
numBytesToRead -= numBytesRead;
|
||||||
|
}
|
||||||
|
numBytesRead = recordLength - numBytesToRead;
|
||||||
|
pos += numBytesRead;
|
||||||
|
if (numBytesRead > 0) {
|
||||||
|
dataRead = true;
|
||||||
|
if (numBytesRead >= recordLength) {
|
||||||
|
if (!isCompressedInput) {
|
||||||
|
numRecordsRemainingInSplit--;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new IOException("Partial record(length = " + numBytesRead
|
||||||
|
+ ") found at the end of split.");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
numRecordsRemainingInSplit = 0L; // End of input.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return dataRead;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongWritable getCurrentKey() {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public BytesWritable getCurrentValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized float getProgress() throws IOException {
|
||||||
|
if (start == end) {
|
||||||
|
return 0.0f;
|
||||||
|
} else {
|
||||||
|
return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void close() throws IOException {
|
||||||
|
try {
|
||||||
|
if (inputStream != null) {
|
||||||
|
inputStream.close();
|
||||||
|
inputStream = null;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (decompressor != null) {
|
||||||
|
CodecPool.returnDecompressor(decompressor);
|
||||||
|
decompressor = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is called from the old FixedLengthRecordReader API implementation.
|
||||||
|
public long getPos() {
|
||||||
|
return pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getFilePosition() throws IOException {
|
||||||
|
long retVal;
|
||||||
|
if (isCompressedInput && null != filePosition) {
|
||||||
|
retVal = filePosition.getPos();
|
||||||
|
} else {
|
||||||
|
retVal = pos;
|
||||||
|
}
|
||||||
|
return retVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,416 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.io.OutputStreamWriter;
|
||||||
|
import java.io.Writer;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.BytesWritable;
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
import org.apache.hadoop.io.compress.*;
|
||||||
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import static junit.framework.Assert.*;
|
||||||
|
|
||||||
|
public class TestFixedLengthInputFormat {
|
||||||
|
|
||||||
|
private static Log LOG;
|
||||||
|
private static Configuration defaultConf;
|
||||||
|
private static FileSystem localFs;
|
||||||
|
private static Path workDir;
|
||||||
|
private static Reporter voidReporter;
|
||||||
|
|
||||||
|
// some chars for the record data
|
||||||
|
private static char[] chars;
|
||||||
|
private static Random charRand;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void onlyOnce() {
|
||||||
|
try {
|
||||||
|
LOG = LogFactory.getLog(TestFixedLengthInputFormat.class.getName());
|
||||||
|
defaultConf = new Configuration();
|
||||||
|
defaultConf.set("fs.defaultFS", "file:///");
|
||||||
|
localFs = FileSystem.getLocal(defaultConf);
|
||||||
|
voidReporter = Reporter.NULL;
|
||||||
|
// our set of chars
|
||||||
|
chars = ("abcdefghijklmnopqrstuvABCDEFGHIJKLMN OPQRSTUVWXYZ1234567890)"
|
||||||
|
+ "(*&^%$#@!-=><?:\"{}][';/.,']").toCharArray();
|
||||||
|
workDir =
|
||||||
|
new Path(new Path(System.getProperty("test.build.data", "."), "data"),
|
||||||
|
"TestKeyValueFixedLengthInputFormat");
|
||||||
|
charRand = new Random();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException("init failure", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 20 random tests of various record, file, and split sizes. All tests have
|
||||||
|
* uncompressed file as input.
|
||||||
|
*/
|
||||||
|
@Test (timeout=500000)
|
||||||
|
public void testFormat() throws IOException {
|
||||||
|
runRandomTests(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 20 random tests of various record, file, and split sizes. All tests have
|
||||||
|
* compressed file as input.
|
||||||
|
*/
|
||||||
|
@Test (timeout=500000)
|
||||||
|
public void testFormatCompressedIn() throws IOException {
|
||||||
|
runRandomTests(new GzipCodec());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test with no record length set.
|
||||||
|
*/
|
||||||
|
@Test (timeout=5000)
|
||||||
|
public void testNoRecordLength() throws IOException {
|
||||||
|
localFs.delete(workDir, true);
|
||||||
|
Path file = new Path(workDir, new String("testFormat.txt"));
|
||||||
|
createFile(file, null, 10, 10);
|
||||||
|
// Set the fixed length record length config property
|
||||||
|
Configuration testConf = new Configuration(defaultConf);
|
||||||
|
JobConf job = new JobConf(testConf);
|
||||||
|
FileInputFormat.setInputPaths(job, workDir);
|
||||||
|
FixedLengthInputFormat format = new FixedLengthInputFormat();
|
||||||
|
format.configure(job);
|
||||||
|
InputSplit splits[] = format.getSplits(job, 1);
|
||||||
|
boolean exceptionThrown = false;
|
||||||
|
for (InputSplit split : splits) {
|
||||||
|
try {
|
||||||
|
RecordReader<LongWritable, BytesWritable> reader =
|
||||||
|
format.getRecordReader(split, job, voidReporter);
|
||||||
|
} catch(IOException ioe) {
|
||||||
|
exceptionThrown = true;
|
||||||
|
LOG.info("Exception message:" + ioe.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue("Exception for not setting record length:", exceptionThrown);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test with record length set to 0
|
||||||
|
*/
|
||||||
|
@Test (timeout=5000)
|
||||||
|
public void testZeroRecordLength() throws IOException {
|
||||||
|
localFs.delete(workDir, true);
|
||||||
|
Path file = new Path(workDir, new String("testFormat.txt"));
|
||||||
|
createFile(file, null, 10, 10);
|
||||||
|
// Set the fixed length record length config property
|
||||||
|
Configuration testConf = new Configuration(defaultConf);
|
||||||
|
JobConf job = new JobConf(testConf);
|
||||||
|
FileInputFormat.setInputPaths(job, workDir);
|
||||||
|
FixedLengthInputFormat format = new FixedLengthInputFormat();
|
||||||
|
format.setRecordLength(job, 0);
|
||||||
|
format.configure(job);
|
||||||
|
InputSplit splits[] = format.getSplits(job, 1);
|
||||||
|
boolean exceptionThrown = false;
|
||||||
|
for (InputSplit split : splits) {
|
||||||
|
try {
|
||||||
|
RecordReader<LongWritable, BytesWritable> reader =
|
||||||
|
format.getRecordReader(split, job, voidReporter);
|
||||||
|
} catch(IOException ioe) {
|
||||||
|
exceptionThrown = true;
|
||||||
|
LOG.info("Exception message:" + ioe.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue("Exception for zero record length:", exceptionThrown);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test with record length set to a negative value
|
||||||
|
*/
|
||||||
|
@Test (timeout=5000)
|
||||||
|
public void testNegativeRecordLength() throws IOException {
|
||||||
|
localFs.delete(workDir, true);
|
||||||
|
Path file = new Path(workDir, new String("testFormat.txt"));
|
||||||
|
createFile(file, null, 10, 10);
|
||||||
|
// Set the fixed length record length config property
|
||||||
|
Configuration testConf = new Configuration(defaultConf);
|
||||||
|
JobConf job = new JobConf(testConf);
|
||||||
|
FileInputFormat.setInputPaths(job, workDir);
|
||||||
|
FixedLengthInputFormat format = new FixedLengthInputFormat();
|
||||||
|
format.setRecordLength(job, -10);
|
||||||
|
format.configure(job);
|
||||||
|
InputSplit splits[] = format.getSplits(job, 1);
|
||||||
|
boolean exceptionThrown = false;
|
||||||
|
for (InputSplit split : splits) {
|
||||||
|
try {
|
||||||
|
RecordReader<LongWritable, BytesWritable> reader =
|
||||||
|
format.getRecordReader(split, job, voidReporter);
|
||||||
|
} catch(IOException ioe) {
|
||||||
|
exceptionThrown = true;
|
||||||
|
LOG.info("Exception message:" + ioe.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue("Exception for negative record length:", exceptionThrown);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test with partial record at the end of a compressed input file.
|
||||||
|
*/
|
||||||
|
@Test (timeout=5000)
|
||||||
|
public void testPartialRecordCompressedIn() throws IOException {
|
||||||
|
CompressionCodec gzip = new GzipCodec();
|
||||||
|
runPartialRecordTest(gzip);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test with partial record at the end of an uncompressed input file.
|
||||||
|
*/
|
||||||
|
@Test (timeout=5000)
|
||||||
|
public void testPartialRecordUncompressedIn() throws IOException {
|
||||||
|
runPartialRecordTest(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test using the gzip codec with two input files.
|
||||||
|
*/
|
||||||
|
@Test (timeout=5000)
|
||||||
|
public void testGzipWithTwoInputs() throws IOException {
|
||||||
|
CompressionCodec gzip = new GzipCodec();
|
||||||
|
localFs.delete(workDir, true);
|
||||||
|
// Create files with fixed length records with 5 byte long records.
|
||||||
|
writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip,
|
||||||
|
"one two threefour five six seveneightnine ten ");
|
||||||
|
writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
|
||||||
|
"ten nine eightsevensix five four threetwo one ");
|
||||||
|
FixedLengthInputFormat format = new FixedLengthInputFormat();
|
||||||
|
format.setRecordLength(defaultConf, 5);
|
||||||
|
JobConf job = new JobConf(defaultConf);
|
||||||
|
FileInputFormat.setInputPaths(job, workDir);
|
||||||
|
ReflectionUtils.setConf(gzip, job);
|
||||||
|
format.configure(job);
|
||||||
|
InputSplit[] splits = format.getSplits(job, 100);
|
||||||
|
assertEquals("compressed splits == 2", 2, splits.length);
|
||||||
|
FileSplit tmp = (FileSplit) splits[0];
|
||||||
|
if (tmp.getPath().getName().equals("part2.txt.gz")) {
|
||||||
|
splits[0] = splits[1];
|
||||||
|
splits[1] = tmp;
|
||||||
|
}
|
||||||
|
List<String> results = readSplit(format, splits[0], job);
|
||||||
|
assertEquals("splits[0] length", 10, results.size());
|
||||||
|
assertEquals("splits[0][5]", "six ", results.get(5));
|
||||||
|
results = readSplit(format, splits[1], job);
|
||||||
|
assertEquals("splits[1] length", 10, results.size());
|
||||||
|
assertEquals("splits[1][0]", "ten ", results.get(0));
|
||||||
|
assertEquals("splits[1][1]", "nine ", results.get(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a file containing fixed length records with random data
|
||||||
|
private ArrayList<String> createFile(Path targetFile, CompressionCodec codec,
|
||||||
|
int recordLen,
|
||||||
|
int numRecords) throws IOException {
|
||||||
|
ArrayList<String> recordList = new ArrayList<String>(numRecords);
|
||||||
|
OutputStream ostream = localFs.create(targetFile);
|
||||||
|
if (codec != null) {
|
||||||
|
ostream = codec.createOutputStream(ostream);
|
||||||
|
}
|
||||||
|
Writer writer = new OutputStreamWriter(ostream);
|
||||||
|
try {
|
||||||
|
StringBuffer sb = new StringBuffer();
|
||||||
|
for (int i = 0; i < numRecords; i++) {
|
||||||
|
for (int j = 0; j < recordLen; j++) {
|
||||||
|
sb.append(chars[charRand.nextInt(chars.length)]);
|
||||||
|
}
|
||||||
|
String recordData = sb.toString();
|
||||||
|
recordList.add(recordData);
|
||||||
|
writer.write(recordData);
|
||||||
|
sb.setLength(0);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
writer.close();
|
||||||
|
}
|
||||||
|
return recordList;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runRandomTests(CompressionCodec codec) throws IOException {
|
||||||
|
StringBuilder fileName = new StringBuilder("testFormat.txt");
|
||||||
|
if (codec != null) {
|
||||||
|
fileName.append(".gz");
|
||||||
|
}
|
||||||
|
localFs.delete(workDir, true);
|
||||||
|
Path file = new Path(workDir, fileName.toString());
|
||||||
|
int seed = new Random().nextInt();
|
||||||
|
LOG.info("Seed = " + seed);
|
||||||
|
Random random = new Random(seed);
|
||||||
|
int MAX_TESTS = 20;
|
||||||
|
LongWritable key = new LongWritable();
|
||||||
|
BytesWritable value = new BytesWritable();
|
||||||
|
|
||||||
|
for (int i = 0; i < MAX_TESTS; i++) {
|
||||||
|
LOG.info("----------------------------------------------------------");
|
||||||
|
// Maximum total records of 999
|
||||||
|
int totalRecords = random.nextInt(999)+1;
|
||||||
|
// Test an empty file
|
||||||
|
if (i == 8) {
|
||||||
|
totalRecords = 0;
|
||||||
|
}
|
||||||
|
// Maximum bytes in a record of 100K
|
||||||
|
int recordLength = random.nextInt(1024*100)+1;
|
||||||
|
// For the 11th test, force a record length of 1
|
||||||
|
if (i == 10) {
|
||||||
|
recordLength = 1;
|
||||||
|
}
|
||||||
|
// The total bytes in the test file
|
||||||
|
int fileSize = (totalRecords * recordLength);
|
||||||
|
LOG.info("totalRecords=" + totalRecords + " recordLength="
|
||||||
|
+ recordLength);
|
||||||
|
// Create the test file
|
||||||
|
ArrayList<String> recordList
|
||||||
|
= createFile(file, codec, recordLength, totalRecords);
|
||||||
|
assertTrue(localFs.exists(file));
|
||||||
|
// Set the fixed length record length config property
|
||||||
|
Configuration testConf = new Configuration(defaultConf);
|
||||||
|
FixedLengthInputFormat.setRecordLength(testConf, recordLength);
|
||||||
|
|
||||||
|
int numSplits = 1;
|
||||||
|
// Arbitrarily set number of splits.
|
||||||
|
if (i > 0) {
|
||||||
|
if (i == (MAX_TESTS-1)) {
|
||||||
|
// Test a split size that is less than record len
|
||||||
|
numSplits = (int)(fileSize/Math.floor(recordLength/2));
|
||||||
|
} else {
|
||||||
|
if (MAX_TESTS % i == 0) {
|
||||||
|
// Let us create a split size that is forced to be
|
||||||
|
// smaller than the end file itself, (ensures 1+ splits)
|
||||||
|
numSplits = fileSize/(fileSize - random.nextInt(fileSize));
|
||||||
|
} else {
|
||||||
|
// Just pick a random split size with no upper bound
|
||||||
|
numSplits = Math.max(1, fileSize/random.nextInt(Integer.MAX_VALUE));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("Number of splits set to: " + numSplits);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the job, and setup the input path
|
||||||
|
JobConf job = new JobConf(testConf);
|
||||||
|
FileInputFormat.setInputPaths(job, workDir);
|
||||||
|
// Try splitting the file in a variety of sizes
|
||||||
|
FixedLengthInputFormat format = new FixedLengthInputFormat();
|
||||||
|
format.configure(job);
|
||||||
|
InputSplit splits[] = format.getSplits(job, numSplits);
|
||||||
|
LOG.info("Actual number of splits = " + splits.length);
|
||||||
|
// Test combined split lengths = total file size
|
||||||
|
long recordOffset = 0;
|
||||||
|
int recordNumber = 0;
|
||||||
|
for (InputSplit split : splits) {
|
||||||
|
RecordReader<LongWritable, BytesWritable> reader =
|
||||||
|
format.getRecordReader(split, job, voidReporter);
|
||||||
|
Class<?> clazz = reader.getClass();
|
||||||
|
assertEquals("RecordReader class should be FixedLengthRecordReader:",
|
||||||
|
FixedLengthRecordReader.class, clazz);
|
||||||
|
// Plow through the records in this split
|
||||||
|
while (reader.next(key, value)) {
|
||||||
|
assertEquals("Checking key", (long)(recordNumber*recordLength),
|
||||||
|
key.get());
|
||||||
|
String valueString =
|
||||||
|
new String(value.getBytes(), 0, value.getLength());
|
||||||
|
assertEquals("Checking record length:", recordLength,
|
||||||
|
value.getLength());
|
||||||
|
assertTrue("Checking for more records than expected:",
|
||||||
|
recordNumber < totalRecords);
|
||||||
|
String origRecord = recordList.get(recordNumber);
|
||||||
|
assertEquals("Checking record content:", origRecord, valueString);
|
||||||
|
recordNumber++;
|
||||||
|
}
|
||||||
|
reader.close();
|
||||||
|
}
|
||||||
|
assertEquals("Total original records should be total read records:",
|
||||||
|
recordList.size(), recordNumber);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void writeFile(FileSystem fs, Path name,
|
||||||
|
CompressionCodec codec,
|
||||||
|
String contents) throws IOException {
|
||||||
|
OutputStream stm;
|
||||||
|
if (codec == null) {
|
||||||
|
stm = fs.create(name);
|
||||||
|
} else {
|
||||||
|
stm = codec.createOutputStream(fs.create(name));
|
||||||
|
}
|
||||||
|
stm.write(contents.getBytes());
|
||||||
|
stm.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<String> readSplit(FixedLengthInputFormat format,
|
||||||
|
InputSplit split,
|
||||||
|
JobConf job) throws IOException {
|
||||||
|
List<String> result = new ArrayList<String>();
|
||||||
|
RecordReader<LongWritable, BytesWritable> reader =
|
||||||
|
format.getRecordReader(split, job, voidReporter);
|
||||||
|
LongWritable key = reader.createKey();
|
||||||
|
BytesWritable value = reader.createValue();
|
||||||
|
while (reader.next(key, value)) {
|
||||||
|
result.add(new String(value.getBytes(), 0, value.getLength()));
|
||||||
|
}
|
||||||
|
reader.close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runPartialRecordTest(CompressionCodec codec) throws IOException {
|
||||||
|
localFs.delete(workDir, true);
|
||||||
|
// Create a file with fixed length records with 5 byte long
|
||||||
|
// records with a partial record at the end.
|
||||||
|
StringBuilder fileName = new StringBuilder("testFormat.txt");
|
||||||
|
if (codec != null) {
|
||||||
|
fileName.append(".gz");
|
||||||
|
}
|
||||||
|
writeFile(localFs, new Path(workDir, fileName.toString()), codec,
|
||||||
|
"one two threefour five six seveneightnine ten");
|
||||||
|
FixedLengthInputFormat format = new FixedLengthInputFormat();
|
||||||
|
format.setRecordLength(defaultConf, 5);
|
||||||
|
JobConf job = new JobConf(defaultConf);
|
||||||
|
FileInputFormat.setInputPaths(job, workDir);
|
||||||
|
if (codec != null) {
|
||||||
|
ReflectionUtils.setConf(codec, job);
|
||||||
|
}
|
||||||
|
format.configure(job);
|
||||||
|
InputSplit[] splits = format.getSplits(job, 100);
|
||||||
|
if (codec != null) {
|
||||||
|
assertEquals("compressed splits == 1", 1, splits.length);
|
||||||
|
}
|
||||||
|
boolean exceptionThrown = false;
|
||||||
|
for (InputSplit split : splits) {
|
||||||
|
try {
|
||||||
|
List<String> results = readSplit(format, split, job);
|
||||||
|
} catch(IOException ioe) {
|
||||||
|
exceptionThrown = true;
|
||||||
|
LOG.info("Exception message:" + ioe.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue("Exception for partial record:", exceptionThrown);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,461 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.lib.input;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.io.OutputStreamWriter;
|
||||||
|
import java.io.Writer;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.BytesWritable;
|
||||||
|
import org.apache.hadoop.io.LongWritable;
|
||||||
|
import org.apache.hadoop.io.compress.*;
|
||||||
|
import org.apache.hadoop.mapreduce.InputSplit;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.MapContext;
|
||||||
|
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
|
||||||
|
import org.apache.hadoop.mapreduce.RecordReader;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
import org.apache.hadoop.mapreduce.task.MapContextImpl;
|
||||||
|
|
||||||
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import static junit.framework.Assert.*;
|
||||||
|
|
||||||
|
public class TestFixedLengthInputFormat {
|
||||||
|
|
||||||
|
private static Log LOG;
|
||||||
|
private static Configuration defaultConf;
|
||||||
|
private static FileSystem localFs;
|
||||||
|
private static Path workDir;
|
||||||
|
|
||||||
|
// some chars for the record data
|
||||||
|
private static char[] chars;
|
||||||
|
private static Random charRand;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void onlyOnce() {
|
||||||
|
try {
|
||||||
|
LOG = LogFactory.getLog(TestFixedLengthInputFormat.class.getName());
|
||||||
|
defaultConf = new Configuration();
|
||||||
|
defaultConf.set("fs.defaultFS", "file:///");
|
||||||
|
localFs = FileSystem.getLocal(defaultConf);
|
||||||
|
// our set of chars
|
||||||
|
chars = ("abcdefghijklmnopqrstuvABCDEFGHIJKLMN OPQRSTUVWXYZ1234567890)"
|
||||||
|
+ "(*&^%$#@!-=><?:\"{}][';/.,']").toCharArray();
|
||||||
|
workDir =
|
||||||
|
new Path(new Path(System.getProperty("test.build.data", "."), "data"),
|
||||||
|
"TestKeyValueFixedLengthInputFormat");
|
||||||
|
charRand = new Random();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException("init failure", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 20 random tests of various record, file, and split sizes. All tests have
|
||||||
|
* uncompressed file as input.
|
||||||
|
*/
|
||||||
|
@Test (timeout=500000)
|
||||||
|
public void testFormat() throws Exception {
|
||||||
|
runRandomTests(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 20 random tests of various record, file, and split sizes. All tests have
|
||||||
|
* compressed file as input.
|
||||||
|
*/
|
||||||
|
@Test (timeout=500000)
|
||||||
|
public void testFormatCompressedIn() throws Exception {
|
||||||
|
runRandomTests(new GzipCodec());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test with no record length set.
|
||||||
|
*/
|
||||||
|
@Test (timeout=5000)
|
||||||
|
public void testNoRecordLength() throws Exception {
|
||||||
|
localFs.delete(workDir, true);
|
||||||
|
Path file = new Path(workDir, new String("testFormat.txt"));
|
||||||
|
createFile(file, null, 10, 10);
|
||||||
|
// Set the fixed length record length config property
|
||||||
|
Configuration testConf = new Configuration(defaultConf);
|
||||||
|
Job job = Job.getInstance(testConf);
|
||||||
|
FileInputFormat.setInputPaths(job, workDir);
|
||||||
|
FixedLengthInputFormat format = new FixedLengthInputFormat();
|
||||||
|
List<InputSplit> splits = format.getSplits(job);
|
||||||
|
boolean exceptionThrown = false;
|
||||||
|
for (InputSplit split : splits) {
|
||||||
|
try {
|
||||||
|
TaskAttemptContext context = MapReduceTestUtil.
|
||||||
|
createDummyMapTaskAttemptContext(job.getConfiguration());
|
||||||
|
RecordReader<LongWritable, BytesWritable> reader =
|
||||||
|
format.createRecordReader(split, context);
|
||||||
|
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
|
||||||
|
mcontext =
|
||||||
|
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
|
||||||
|
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
|
||||||
|
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
|
||||||
|
reader.initialize(split, mcontext);
|
||||||
|
} catch(IOException ioe) {
|
||||||
|
exceptionThrown = true;
|
||||||
|
LOG.info("Exception message:" + ioe.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue("Exception for not setting record length:", exceptionThrown);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test with record length set to 0
|
||||||
|
*/
|
||||||
|
@Test (timeout=5000)
|
||||||
|
public void testZeroRecordLength() throws Exception {
|
||||||
|
localFs.delete(workDir, true);
|
||||||
|
Path file = new Path(workDir, new String("testFormat.txt"));
|
||||||
|
createFile(file, null, 10, 10);
|
||||||
|
// Set the fixed length record length config property
|
||||||
|
Configuration testConf = new Configuration(defaultConf);
|
||||||
|
FixedLengthInputFormat format = new FixedLengthInputFormat();
|
||||||
|
format.setRecordLength(testConf, 0);
|
||||||
|
Job job = Job.getInstance(testConf);
|
||||||
|
FileInputFormat.setInputPaths(job, workDir);
|
||||||
|
List<InputSplit> splits = format.getSplits(job);
|
||||||
|
boolean exceptionThrown = false;
|
||||||
|
for (InputSplit split : splits) {
|
||||||
|
try {
|
||||||
|
TaskAttemptContext context =
|
||||||
|
MapReduceTestUtil.createDummyMapTaskAttemptContext(
|
||||||
|
job.getConfiguration());
|
||||||
|
RecordReader<LongWritable, BytesWritable> reader =
|
||||||
|
format.createRecordReader(split, context);
|
||||||
|
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
|
||||||
|
mcontext =
|
||||||
|
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
|
||||||
|
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
|
||||||
|
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
|
||||||
|
reader.initialize(split, mcontext);
|
||||||
|
} catch(IOException ioe) {
|
||||||
|
exceptionThrown = true;
|
||||||
|
LOG.info("Exception message:" + ioe.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue("Exception for zero record length:", exceptionThrown);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test with record length set to a negative value
|
||||||
|
*/
|
||||||
|
@Test (timeout=5000)
|
||||||
|
public void testNegativeRecordLength() throws Exception {
|
||||||
|
localFs.delete(workDir, true);
|
||||||
|
Path file = new Path(workDir, new String("testFormat.txt"));
|
||||||
|
createFile(file, null, 10, 10);
|
||||||
|
// Set the fixed length record length config property
|
||||||
|
Configuration testConf = new Configuration(defaultConf);
|
||||||
|
FixedLengthInputFormat format = new FixedLengthInputFormat();
|
||||||
|
format.setRecordLength(testConf, -10);
|
||||||
|
Job job = Job.getInstance(testConf);
|
||||||
|
FileInputFormat.setInputPaths(job, workDir);
|
||||||
|
List<InputSplit> splits = format.getSplits(job);
|
||||||
|
boolean exceptionThrown = false;
|
||||||
|
for (InputSplit split : splits) {
|
||||||
|
try {
|
||||||
|
TaskAttemptContext context = MapReduceTestUtil.
|
||||||
|
createDummyMapTaskAttemptContext(job.getConfiguration());
|
||||||
|
RecordReader<LongWritable, BytesWritable> reader =
|
||||||
|
format.createRecordReader(split, context);
|
||||||
|
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
|
||||||
|
mcontext =
|
||||||
|
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
|
||||||
|
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
|
||||||
|
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
|
||||||
|
reader.initialize(split, mcontext);
|
||||||
|
} catch(IOException ioe) {
|
||||||
|
exceptionThrown = true;
|
||||||
|
LOG.info("Exception message:" + ioe.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue("Exception for negative record length:", exceptionThrown);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test with partial record at the end of a compressed input file.
|
||||||
|
*/
|
||||||
|
@Test (timeout=5000)
|
||||||
|
public void testPartialRecordCompressedIn() throws Exception {
|
||||||
|
CompressionCodec gzip = new GzipCodec();
|
||||||
|
runPartialRecordTest(gzip);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test with partial record at the end of an uncompressed input file.
|
||||||
|
*/
|
||||||
|
@Test (timeout=5000)
|
||||||
|
public void testPartialRecordUncompressedIn() throws Exception {
|
||||||
|
runPartialRecordTest(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test using the gzip codec with two input files.
|
||||||
|
*/
|
||||||
|
@Test (timeout=5000)
|
||||||
|
public void testGzipWithTwoInputs() throws Exception {
|
||||||
|
CompressionCodec gzip = new GzipCodec();
|
||||||
|
localFs.delete(workDir, true);
|
||||||
|
// Create files with fixed length records with 5 byte long records.
|
||||||
|
writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip,
|
||||||
|
"one two threefour five six seveneightnine ten ");
|
||||||
|
writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
|
||||||
|
"ten nine eightsevensix five four threetwo one ");
|
||||||
|
FixedLengthInputFormat format = new FixedLengthInputFormat();
|
||||||
|
format.setRecordLength(defaultConf, 5);
|
||||||
|
ReflectionUtils.setConf(gzip, defaultConf);
|
||||||
|
Job job = Job.getInstance(defaultConf);
|
||||||
|
FileInputFormat.setInputPaths(job, workDir);
|
||||||
|
List<InputSplit> splits = format.getSplits(job);
|
||||||
|
assertEquals("compressed splits == 2", 2, splits.size());
|
||||||
|
FileSplit tmp = (FileSplit) splits.get(0);
|
||||||
|
if (tmp.getPath().getName().equals("part2.txt.gz")) {
|
||||||
|
splits.set(0, splits.get(1));
|
||||||
|
splits.set(1, tmp);
|
||||||
|
}
|
||||||
|
List<String> results = readSplit(format, splits.get(0), job);
|
||||||
|
assertEquals("splits[0] length", 10, results.size());
|
||||||
|
assertEquals("splits[0][5]", "six ", results.get(5));
|
||||||
|
results = readSplit(format, splits.get(1), job);
|
||||||
|
assertEquals("splits[1] length", 10, results.size());
|
||||||
|
assertEquals("splits[1][0]", "ten ", results.get(0));
|
||||||
|
assertEquals("splits[1][1]", "nine ", results.get(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a file containing fixed length records with random data
|
||||||
|
private ArrayList<String> createFile(Path targetFile, CompressionCodec codec,
|
||||||
|
int recordLen,
|
||||||
|
int numRecords) throws IOException {
|
||||||
|
ArrayList<String> recordList = new ArrayList<String>(numRecords);
|
||||||
|
OutputStream ostream = localFs.create(targetFile);
|
||||||
|
if (codec != null) {
|
||||||
|
ostream = codec.createOutputStream(ostream);
|
||||||
|
}
|
||||||
|
Writer writer = new OutputStreamWriter(ostream);
|
||||||
|
try {
|
||||||
|
StringBuffer sb = new StringBuffer();
|
||||||
|
for (int i = 0; i < numRecords; i++) {
|
||||||
|
for (int j = 0; j < recordLen; j++) {
|
||||||
|
sb.append(chars[charRand.nextInt(chars.length)]);
|
||||||
|
}
|
||||||
|
String recordData = sb.toString();
|
||||||
|
recordList.add(recordData);
|
||||||
|
writer.write(recordData);
|
||||||
|
sb.setLength(0);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
writer.close();
|
||||||
|
}
|
||||||
|
return recordList;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runRandomTests(CompressionCodec codec) throws Exception {
|
||||||
|
StringBuilder fileName = new StringBuilder("testFormat.txt");
|
||||||
|
if (codec != null) {
|
||||||
|
fileName.append(".gz");
|
||||||
|
}
|
||||||
|
localFs.delete(workDir, true);
|
||||||
|
Path file = new Path(workDir, fileName.toString());
|
||||||
|
int seed = new Random().nextInt();
|
||||||
|
LOG.info("Seed = " + seed);
|
||||||
|
Random random = new Random(seed);
|
||||||
|
int MAX_TESTS = 20;
|
||||||
|
LongWritable key;
|
||||||
|
BytesWritable value;
|
||||||
|
|
||||||
|
for (int i = 0; i < MAX_TESTS; i++) {
|
||||||
|
LOG.info("----------------------------------------------------------");
|
||||||
|
// Maximum total records of 999
|
||||||
|
int totalRecords = random.nextInt(999)+1;
|
||||||
|
// Test an empty file
|
||||||
|
if (i == 8) {
|
||||||
|
totalRecords = 0;
|
||||||
|
}
|
||||||
|
// Maximum bytes in a record of 100K
|
||||||
|
int recordLength = random.nextInt(1024*100)+1;
|
||||||
|
// For the 11th test, force a record length of 1
|
||||||
|
if (i == 10) {
|
||||||
|
recordLength = 1;
|
||||||
|
}
|
||||||
|
// The total bytes in the test file
|
||||||
|
int fileSize = (totalRecords * recordLength);
|
||||||
|
LOG.info("totalRecords=" + totalRecords + " recordLength="
|
||||||
|
+ recordLength);
|
||||||
|
// Create the test file
|
||||||
|
ArrayList<String> recordList =
|
||||||
|
createFile(file, codec, recordLength, totalRecords);
|
||||||
|
assertTrue(localFs.exists(file));
|
||||||
|
// Set the fixed length record length config property
|
||||||
|
Configuration testConf = new Configuration(defaultConf);
|
||||||
|
FixedLengthInputFormat.setRecordLength(testConf, recordLength);
|
||||||
|
|
||||||
|
int numSplits = 1;
|
||||||
|
// Arbitrarily set number of splits.
|
||||||
|
if (i > 0) {
|
||||||
|
if (i == (MAX_TESTS-1)) {
|
||||||
|
// Test a split size that is less than record len
|
||||||
|
numSplits = (int)(fileSize/Math.floor(recordLength/2));
|
||||||
|
} else {
|
||||||
|
if (MAX_TESTS % i == 0) {
|
||||||
|
// Let us create a split size that is forced to be
|
||||||
|
// smaller than the end file itself, (ensures 1+ splits)
|
||||||
|
numSplits = fileSize/(fileSize - random.nextInt(fileSize));
|
||||||
|
} else {
|
||||||
|
// Just pick a random split size with no upper bound
|
||||||
|
numSplits = Math.max(1, fileSize/random.nextInt(Integer.MAX_VALUE));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info("Number of splits set to: " + numSplits);
|
||||||
|
}
|
||||||
|
testConf.setLong("mapreduce.input.fileinputformat.split.maxsize",
|
||||||
|
(long)(fileSize/numSplits));
|
||||||
|
|
||||||
|
// Create the job, and setup the input path
|
||||||
|
Job job = Job.getInstance(testConf);
|
||||||
|
FileInputFormat.setInputPaths(job, workDir);
|
||||||
|
// Try splitting the file in a variety of sizes
|
||||||
|
FixedLengthInputFormat format = new FixedLengthInputFormat();
|
||||||
|
List<InputSplit> splits = format.getSplits(job);
|
||||||
|
LOG.info("Actual number of splits = " + splits.size());
|
||||||
|
// Test combined split lengths = total file size
|
||||||
|
long recordOffset = 0;
|
||||||
|
int recordNumber = 0;
|
||||||
|
for (InputSplit split : splits) {
|
||||||
|
TaskAttemptContext context = MapReduceTestUtil.
|
||||||
|
createDummyMapTaskAttemptContext(job.getConfiguration());
|
||||||
|
RecordReader<LongWritable, BytesWritable> reader =
|
||||||
|
format.createRecordReader(split, context);
|
||||||
|
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
|
||||||
|
mcontext =
|
||||||
|
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
|
||||||
|
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
|
||||||
|
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
|
||||||
|
reader.initialize(split, mcontext);
|
||||||
|
Class<?> clazz = reader.getClass();
|
||||||
|
assertEquals("RecordReader class should be FixedLengthRecordReader:",
|
||||||
|
FixedLengthRecordReader.class, clazz);
|
||||||
|
// Plow through the records in this split
|
||||||
|
while (reader.nextKeyValue()) {
|
||||||
|
key = reader.getCurrentKey();
|
||||||
|
value = reader.getCurrentValue();
|
||||||
|
assertEquals("Checking key", (long)(recordNumber*recordLength),
|
||||||
|
key.get());
|
||||||
|
String valueString = new String(value.getBytes(), 0,
|
||||||
|
value.getLength());
|
||||||
|
assertEquals("Checking record length:", recordLength,
|
||||||
|
value.getLength());
|
||||||
|
assertTrue("Checking for more records than expected:",
|
||||||
|
recordNumber < totalRecords);
|
||||||
|
String origRecord = recordList.get(recordNumber);
|
||||||
|
assertEquals("Checking record content:", origRecord, valueString);
|
||||||
|
recordNumber++;
|
||||||
|
}
|
||||||
|
reader.close();
|
||||||
|
}
|
||||||
|
assertEquals("Total original records should be total read records:",
|
||||||
|
recordList.size(), recordNumber);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void writeFile(FileSystem fs, Path name,
|
||||||
|
CompressionCodec codec,
|
||||||
|
String contents) throws IOException {
|
||||||
|
OutputStream stm;
|
||||||
|
if (codec == null) {
|
||||||
|
stm = fs.create(name);
|
||||||
|
} else {
|
||||||
|
stm = codec.createOutputStream(fs.create(name));
|
||||||
|
}
|
||||||
|
stm.write(contents.getBytes());
|
||||||
|
stm.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<String> readSplit(FixedLengthInputFormat format,
|
||||||
|
InputSplit split,
|
||||||
|
Job job) throws Exception {
|
||||||
|
List<String> result = new ArrayList<String>();
|
||||||
|
TaskAttemptContext context = MapReduceTestUtil.
|
||||||
|
createDummyMapTaskAttemptContext(job.getConfiguration());
|
||||||
|
RecordReader<LongWritable, BytesWritable> reader =
|
||||||
|
format.createRecordReader(split, context);
|
||||||
|
MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable>
|
||||||
|
mcontext =
|
||||||
|
new MapContextImpl<LongWritable, BytesWritable, LongWritable,
|
||||||
|
BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(),
|
||||||
|
reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
|
||||||
|
reader.initialize(split, mcontext);
|
||||||
|
LongWritable key;
|
||||||
|
BytesWritable value;
|
||||||
|
while (reader.nextKeyValue()) {
|
||||||
|
key = reader.getCurrentKey();
|
||||||
|
value = reader.getCurrentValue();
|
||||||
|
result.add(new String(value.getBytes(), 0, value.getLength()));
|
||||||
|
}
|
||||||
|
reader.close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void runPartialRecordTest(CompressionCodec codec) throws Exception {
|
||||||
|
localFs.delete(workDir, true);
|
||||||
|
// Create a file with fixed length records with 5 byte long
|
||||||
|
// records with a partial record at the end.
|
||||||
|
StringBuilder fileName = new StringBuilder("testFormat.txt");
|
||||||
|
if (codec != null) {
|
||||||
|
fileName.append(".gz");
|
||||||
|
ReflectionUtils.setConf(codec, defaultConf);
|
||||||
|
}
|
||||||
|
writeFile(localFs, new Path(workDir, fileName.toString()), codec,
|
||||||
|
"one two threefour five six seveneightnine ten");
|
||||||
|
FixedLengthInputFormat format = new FixedLengthInputFormat();
|
||||||
|
format.setRecordLength(defaultConf, 5);
|
||||||
|
Job job = Job.getInstance(defaultConf);
|
||||||
|
FileInputFormat.setInputPaths(job, workDir);
|
||||||
|
List<InputSplit> splits = format.getSplits(job);
|
||||||
|
if (codec != null) {
|
||||||
|
assertEquals("compressed splits == 1", 1, splits.size());
|
||||||
|
}
|
||||||
|
boolean exceptionThrown = false;
|
||||||
|
for (InputSplit split : splits) {
|
||||||
|
try {
|
||||||
|
List<String> results = readSplit(format, split, job);
|
||||||
|
} catch(IOException ioe) {
|
||||||
|
exceptionThrown = true;
|
||||||
|
LOG.info("Exception message:" + ioe.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue("Exception for partial record:", exceptionThrown);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue