diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index b30c0852e51..ed0ac5adbf5 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -27,6 +27,9 @@ Release 2.3.0 - UNRELEASED
MAPREDUCE-4421. Run MapReduce framework via the distributed cache (jlowe)
+ MAPREDUCE-1176. FixedLengthInputFormat and FixedLengthRecordReader
+ (Mariappan Asokan and BitsOfInfo via Sandy Ryza)
+
OPTIMIZATIONS
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java
new file mode 100644
index 00000000000..60ba69c5db8
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+/**
+ * FixedLengthInputFormat is an input format used to read input files
+ * which contain fixed length records. The content of a record need not be
+ * text. It can be arbitrary binary data. Users must configure the record
+ * length property by calling:
+ * FixedLengthInputFormat.setRecordLength(conf, recordLength);
or
+ * conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, recordLength);
+ *
+ * @see FixedLengthRecordReader
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class FixedLengthInputFormat
+ extends FileInputFormat
+ implements JobConfigurable {
+
+ private CompressionCodecFactory compressionCodecs = null;
+
+ public static final String FIXED_RECORD_LENGTH =
+ "fixedlengthinputformat.record.length";
+
+ /**
+ * Set the length of each record
+ * @param conf configuration
+ * @param recordLength the length of a record
+ */
+ public static void setRecordLength(Configuration conf, int recordLength) {
+ conf.setInt(FIXED_RECORD_LENGTH, recordLength);
+ }
+
+ /**
+ * Get record length value
+ * @param conf configuration
+ * @return the record length, zero means none was set
+ */
+ public static int getRecordLength(Configuration conf) {
+ return conf.getInt(FIXED_RECORD_LENGTH, 0);
+ }
+
+ @Override
+ public void configure(JobConf conf) {
+ compressionCodecs = new CompressionCodecFactory(conf);
+ }
+
+ @Override
+ public RecordReader
+ getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter)
+ throws IOException {
+ reporter.setStatus(genericSplit.toString());
+ int recordLength = getRecordLength(job);
+ if (recordLength <= 0) {
+ throw new IOException("Fixed record length " + recordLength
+ + " is invalid. It should be set to a value greater than zero");
+ }
+ return new FixedLengthRecordReader(job, (FileSplit)genericSplit,
+ recordLength);
+ }
+
+ @Override
+ protected boolean isSplitable(FileSystem fs, Path file) {
+ final CompressionCodec codec = compressionCodecs.getCodec(file);
+ return(null == codec);
+ }
+
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java
new file mode 100644
index 00000000000..6d85fda5c30
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * A reader to read fixed length records from a split. Record offset is
+ * returned as key and the record as bytes is returned in value.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class FixedLengthRecordReader
+ implements RecordReader {
+
+ private int recordLength;
+ // Make use of the new API implementation to avoid code duplication.
+ private org.apache.hadoop.mapreduce.lib.input.FixedLengthRecordReader reader;
+
+ public FixedLengthRecordReader(Configuration job, FileSplit split,
+ int recordLength) throws IOException {
+ this.recordLength = recordLength;
+ reader = new org.apache.hadoop.mapreduce.lib.input.FixedLengthRecordReader(
+ recordLength);
+ reader.initialize(job, split.getStart(), split.getLength(),
+ split.getPath());
+ }
+
+ @Override
+ public LongWritable createKey() {
+ return new LongWritable();
+ }
+
+ @Override
+ public BytesWritable createValue() {
+ return new BytesWritable(new byte[recordLength]);
+ }
+
+ @Override
+ public synchronized boolean next(LongWritable key, BytesWritable value)
+ throws IOException {
+ boolean dataRead = reader.nextKeyValue();
+ if (dataRead) {
+ LongWritable newKey = reader.getCurrentKey();
+ BytesWritable newValue = reader.getCurrentValue();
+ key.set(newKey.get());
+ value.set(newValue);
+ }
+ return dataRead;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return reader.getProgress();
+ }
+
+ @Override
+ public synchronized long getPos() throws IOException {
+ return reader.getPos();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java
new file mode 100644
index 00000000000..9c32e83c5c3
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * FixedLengthInputFormat is an input format used to read input files
+ * which contain fixed length records. The content of a record need not be
+ * text. It can be arbitrary binary data. Users must configure the record
+ * length property by calling:
+ * FixedLengthInputFormat.setRecordLength(conf, recordLength);
or
+ * conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, recordLength);
+ *
+ * @see FixedLengthRecordReader
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class FixedLengthInputFormat
+ extends FileInputFormat {
+
+ public static final String FIXED_RECORD_LENGTH =
+ "fixedlengthinputformat.record.length";
+
+ /**
+ * Set the length of each record
+ * @param conf configuration
+ * @param recordLength the length of a record
+ */
+ public static void setRecordLength(Configuration conf, int recordLength) {
+ conf.setInt(FIXED_RECORD_LENGTH, recordLength);
+ }
+
+ /**
+ * Get record length value
+ * @param conf configuration
+ * @return the record length, zero means none was set
+ */
+ public static int getRecordLength(Configuration conf) {
+ return conf.getInt(FIXED_RECORD_LENGTH, 0);
+ }
+
+ @Override
+ public RecordReader
+ createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ int recordLength = getRecordLength(context.getConfiguration());
+ if (recordLength <= 0) {
+ throw new IOException("Fixed record length " + recordLength
+ + " is invalid. It should be set to a value greater than zero");
+ }
+ return new FixedLengthRecordReader(recordLength);
+ }
+
+ @Override
+ protected boolean isSplitable(JobContext context, Path file) {
+ final CompressionCodec codec =
+ new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
+ return (null == codec);
+ }
+
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java
new file mode 100644
index 00000000000..70fa7b2aec3
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+
+/**
+ * A reader to read fixed length records from a split. Record offset is
+ * returned as key and the record as bytes is returned in value.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class FixedLengthRecordReader
+ extends RecordReader {
+ private static final Log LOG
+ = LogFactory.getLog(FixedLengthRecordReader.class);
+
+ private int recordLength;
+ private long start;
+ private long pos;
+ private long end;
+ private long numRecordsRemainingInSplit;
+ private FSDataInputStream fileIn;
+ private Seekable filePosition;
+ private LongWritable key;
+ private BytesWritable value;
+ private boolean isCompressedInput;
+ private Decompressor decompressor;
+ private InputStream inputStream;
+
+ public FixedLengthRecordReader(int recordLength) {
+ this.recordLength = recordLength;
+ }
+
+ @Override
+ public void initialize(InputSplit genericSplit,
+ TaskAttemptContext context) throws IOException {
+ FileSplit split = (FileSplit) genericSplit;
+ Configuration job = context.getConfiguration();
+ final Path file = split.getPath();
+ initialize(job, split.getStart(), split.getLength(), file);
+ }
+
+ // This is also called from the old FixedLengthRecordReader API implementation
+ public void initialize(Configuration job, long splitStart, long splitLength,
+ Path file) throws IOException {
+ start = splitStart;
+ end = start + splitLength;
+ long partialRecordLength = start % recordLength;
+ long numBytesToSkip = 0;
+ if (partialRecordLength != 0) {
+ numBytesToSkip = recordLength - partialRecordLength;
+ }
+
+ // open the file and seek to the start of the split
+ final FileSystem fs = file.getFileSystem(job);
+ fileIn = fs.open(file);
+
+ CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
+ if (null != codec) {
+ isCompressedInput = true;
+ decompressor = CodecPool.getDecompressor(codec);
+ CompressionInputStream cIn
+ = codec.createInputStream(fileIn, decompressor);
+ filePosition = cIn;
+ inputStream = cIn;
+ numRecordsRemainingInSplit = Long.MAX_VALUE;
+ LOG.info(
+ "Compressed input; cannot compute number of records in the split");
+ } else {
+ fileIn.seek(start);
+ filePosition = fileIn;
+ inputStream = fileIn;
+ long splitSize = end - start - numBytesToSkip;
+ numRecordsRemainingInSplit = (splitSize + recordLength - 1)/recordLength;
+ if (numRecordsRemainingInSplit < 0) {
+ numRecordsRemainingInSplit = 0;
+ }
+ LOG.info("Expecting " + numRecordsRemainingInSplit
+ + " records each with a length of " + recordLength
+ + " bytes in the split with an effective size of "
+ + splitSize + " bytes");
+ }
+ if (numBytesToSkip != 0) {
+ start += inputStream.skip(numBytesToSkip);
+ }
+ this.pos = start;
+ }
+
+ @Override
+ public synchronized boolean nextKeyValue() throws IOException {
+ if (key == null) {
+ key = new LongWritable();
+ }
+ if (value == null) {
+ value = new BytesWritable(new byte[recordLength]);
+ }
+ boolean dataRead = false;
+ value.setSize(recordLength);
+ byte[] record = value.getBytes();
+ if (numRecordsRemainingInSplit > 0) {
+ key.set(pos);
+ int offset = 0;
+ int numBytesToRead = recordLength;
+ int numBytesRead = 0;
+ while (numBytesToRead > 0) {
+ numBytesRead = inputStream.read(record, offset, numBytesToRead);
+ if (numBytesRead == -1) {
+ // EOF
+ break;
+ }
+ offset += numBytesRead;
+ numBytesToRead -= numBytesRead;
+ }
+ numBytesRead = recordLength - numBytesToRead;
+ pos += numBytesRead;
+ if (numBytesRead > 0) {
+ dataRead = true;
+ if (numBytesRead >= recordLength) {
+ if (!isCompressedInput) {
+ numRecordsRemainingInSplit--;
+ }
+ } else {
+ throw new IOException("Partial record(length = " + numBytesRead
+ + ") found at the end of split.");
+ }
+ } else {
+ numRecordsRemainingInSplit = 0L; // End of input.
+ }
+ }
+ return dataRead;
+ }
+
+ @Override
+ public LongWritable getCurrentKey() {
+ return key;
+ }
+
+ @Override
+ public BytesWritable getCurrentValue() {
+ return value;
+ }
+
+ @Override
+ public synchronized float getProgress() throws IOException {
+ if (start == end) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start));
+ }
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ try {
+ if (inputStream != null) {
+ inputStream.close();
+ inputStream = null;
+ }
+ } finally {
+ if (decompressor != null) {
+ CodecPool.returnDecompressor(decompressor);
+ decompressor = null;
+ }
+ }
+ }
+
+ // This is called from the old FixedLengthRecordReader API implementation.
+ public long getPos() {
+ return pos;
+ }
+
+ private long getFilePosition() throws IOException {
+ long retVal;
+ if (isCompressedInput && null != filePosition) {
+ retVal = filePosition.getPos();
+ } else {
+ retVal = pos;
+ }
+ return retVal;
+ }
+
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java
new file mode 100644
index 00000000000..0c614eaf0c7
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static junit.framework.Assert.*;
+
+public class TestFixedLengthInputFormat {
+
+ private static Log LOG;
+ private static Configuration defaultConf;
+ private static FileSystem localFs;
+ private static Path workDir;
+ private static Reporter voidReporter;
+
+ // some chars for the record data
+ private static char[] chars;
+ private static Random charRand;
+
+ @BeforeClass
+ public static void onlyOnce() {
+ try {
+ LOG = LogFactory.getLog(TestFixedLengthInputFormat.class.getName());
+ defaultConf = new Configuration();
+ defaultConf.set("fs.defaultFS", "file:///");
+ localFs = FileSystem.getLocal(defaultConf);
+ voidReporter = Reporter.NULL;
+ // our set of chars
+ chars = ("abcdefghijklmnopqrstuvABCDEFGHIJKLMN OPQRSTUVWXYZ1234567890)"
+ + "(*&^%$#@!-=> reader =
+ format.getRecordReader(split, job, voidReporter);
+ } catch(IOException ioe) {
+ exceptionThrown = true;
+ LOG.info("Exception message:" + ioe.getMessage());
+ }
+ }
+ assertTrue("Exception for not setting record length:", exceptionThrown);
+ }
+
+ /**
+ * Test with record length set to 0
+ */
+ @Test (timeout=5000)
+ public void testZeroRecordLength() throws IOException {
+ localFs.delete(workDir, true);
+ Path file = new Path(workDir, new String("testFormat.txt"));
+ createFile(file, null, 10, 10);
+ // Set the fixed length record length config property
+ Configuration testConf = new Configuration(defaultConf);
+ JobConf job = new JobConf(testConf);
+ FileInputFormat.setInputPaths(job, workDir);
+ FixedLengthInputFormat format = new FixedLengthInputFormat();
+ format.setRecordLength(job, 0);
+ format.configure(job);
+ InputSplit splits[] = format.getSplits(job, 1);
+ boolean exceptionThrown = false;
+ for (InputSplit split : splits) {
+ try {
+ RecordReader reader =
+ format.getRecordReader(split, job, voidReporter);
+ } catch(IOException ioe) {
+ exceptionThrown = true;
+ LOG.info("Exception message:" + ioe.getMessage());
+ }
+ }
+ assertTrue("Exception for zero record length:", exceptionThrown);
+ }
+
+ /**
+ * Test with record length set to a negative value
+ */
+ @Test (timeout=5000)
+ public void testNegativeRecordLength() throws IOException {
+ localFs.delete(workDir, true);
+ Path file = new Path(workDir, new String("testFormat.txt"));
+ createFile(file, null, 10, 10);
+ // Set the fixed length record length config property
+ Configuration testConf = new Configuration(defaultConf);
+ JobConf job = new JobConf(testConf);
+ FileInputFormat.setInputPaths(job, workDir);
+ FixedLengthInputFormat format = new FixedLengthInputFormat();
+ format.setRecordLength(job, -10);
+ format.configure(job);
+ InputSplit splits[] = format.getSplits(job, 1);
+ boolean exceptionThrown = false;
+ for (InputSplit split : splits) {
+ try {
+ RecordReader reader =
+ format.getRecordReader(split, job, voidReporter);
+ } catch(IOException ioe) {
+ exceptionThrown = true;
+ LOG.info("Exception message:" + ioe.getMessage());
+ }
+ }
+ assertTrue("Exception for negative record length:", exceptionThrown);
+ }
+
+ /**
+ * Test with partial record at the end of a compressed input file.
+ */
+ @Test (timeout=5000)
+ public void testPartialRecordCompressedIn() throws IOException {
+ CompressionCodec gzip = new GzipCodec();
+ runPartialRecordTest(gzip);
+ }
+
+ /**
+ * Test with partial record at the end of an uncompressed input file.
+ */
+ @Test (timeout=5000)
+ public void testPartialRecordUncompressedIn() throws IOException {
+ runPartialRecordTest(null);
+ }
+
+ /**
+ * Test using the gzip codec with two input files.
+ */
+ @Test (timeout=5000)
+ public void testGzipWithTwoInputs() throws IOException {
+ CompressionCodec gzip = new GzipCodec();
+ localFs.delete(workDir, true);
+ // Create files with fixed length records with 5 byte long records.
+ writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip,
+ "one two threefour five six seveneightnine ten ");
+ writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
+ "ten nine eightsevensix five four threetwo one ");
+ FixedLengthInputFormat format = new FixedLengthInputFormat();
+ format.setRecordLength(defaultConf, 5);
+ JobConf job = new JobConf(defaultConf);
+ FileInputFormat.setInputPaths(job, workDir);
+ ReflectionUtils.setConf(gzip, job);
+ format.configure(job);
+ InputSplit[] splits = format.getSplits(job, 100);
+ assertEquals("compressed splits == 2", 2, splits.length);
+ FileSplit tmp = (FileSplit) splits[0];
+ if (tmp.getPath().getName().equals("part2.txt.gz")) {
+ splits[0] = splits[1];
+ splits[1] = tmp;
+ }
+ List results = readSplit(format, splits[0], job);
+ assertEquals("splits[0] length", 10, results.size());
+ assertEquals("splits[0][5]", "six ", results.get(5));
+ results = readSplit(format, splits[1], job);
+ assertEquals("splits[1] length", 10, results.size());
+ assertEquals("splits[1][0]", "ten ", results.get(0));
+ assertEquals("splits[1][1]", "nine ", results.get(1));
+ }
+
+ // Create a file containing fixed length records with random data
+ private ArrayList createFile(Path targetFile, CompressionCodec codec,
+ int recordLen,
+ int numRecords) throws IOException {
+ ArrayList recordList = new ArrayList(numRecords);
+ OutputStream ostream = localFs.create(targetFile);
+ if (codec != null) {
+ ostream = codec.createOutputStream(ostream);
+ }
+ Writer writer = new OutputStreamWriter(ostream);
+ try {
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < numRecords; i++) {
+ for (int j = 0; j < recordLen; j++) {
+ sb.append(chars[charRand.nextInt(chars.length)]);
+ }
+ String recordData = sb.toString();
+ recordList.add(recordData);
+ writer.write(recordData);
+ sb.setLength(0);
+ }
+ } finally {
+ writer.close();
+ }
+ return recordList;
+ }
+
+ private void runRandomTests(CompressionCodec codec) throws IOException {
+ StringBuilder fileName = new StringBuilder("testFormat.txt");
+ if (codec != null) {
+ fileName.append(".gz");
+ }
+ localFs.delete(workDir, true);
+ Path file = new Path(workDir, fileName.toString());
+ int seed = new Random().nextInt();
+ LOG.info("Seed = " + seed);
+ Random random = new Random(seed);
+ int MAX_TESTS = 20;
+ LongWritable key = new LongWritable();
+ BytesWritable value = new BytesWritable();
+
+ for (int i = 0; i < MAX_TESTS; i++) {
+ LOG.info("----------------------------------------------------------");
+ // Maximum total records of 999
+ int totalRecords = random.nextInt(999)+1;
+ // Test an empty file
+ if (i == 8) {
+ totalRecords = 0;
+ }
+ // Maximum bytes in a record of 100K
+ int recordLength = random.nextInt(1024*100)+1;
+ // For the 11th test, force a record length of 1
+ if (i == 10) {
+ recordLength = 1;
+ }
+ // The total bytes in the test file
+ int fileSize = (totalRecords * recordLength);
+ LOG.info("totalRecords=" + totalRecords + " recordLength="
+ + recordLength);
+ // Create the test file
+ ArrayList recordList
+ = createFile(file, codec, recordLength, totalRecords);
+ assertTrue(localFs.exists(file));
+ // Set the fixed length record length config property
+ Configuration testConf = new Configuration(defaultConf);
+ FixedLengthInputFormat.setRecordLength(testConf, recordLength);
+
+ int numSplits = 1;
+ // Arbitrarily set number of splits.
+ if (i > 0) {
+ if (i == (MAX_TESTS-1)) {
+ // Test a split size that is less than record len
+ numSplits = (int)(fileSize/Math.floor(recordLength/2));
+ } else {
+ if (MAX_TESTS % i == 0) {
+ // Let us create a split size that is forced to be
+ // smaller than the end file itself, (ensures 1+ splits)
+ numSplits = fileSize/(fileSize - random.nextInt(fileSize));
+ } else {
+ // Just pick a random split size with no upper bound
+ numSplits = Math.max(1, fileSize/random.nextInt(Integer.MAX_VALUE));
+ }
+ }
+ LOG.info("Number of splits set to: " + numSplits);
+ }
+
+ // Create the job, and setup the input path
+ JobConf job = new JobConf(testConf);
+ FileInputFormat.setInputPaths(job, workDir);
+ // Try splitting the file in a variety of sizes
+ FixedLengthInputFormat format = new FixedLengthInputFormat();
+ format.configure(job);
+ InputSplit splits[] = format.getSplits(job, numSplits);
+ LOG.info("Actual number of splits = " + splits.length);
+ // Test combined split lengths = total file size
+ long recordOffset = 0;
+ int recordNumber = 0;
+ for (InputSplit split : splits) {
+ RecordReader reader =
+ format.getRecordReader(split, job, voidReporter);
+ Class> clazz = reader.getClass();
+ assertEquals("RecordReader class should be FixedLengthRecordReader:",
+ FixedLengthRecordReader.class, clazz);
+ // Plow through the records in this split
+ while (reader.next(key, value)) {
+ assertEquals("Checking key", (long)(recordNumber*recordLength),
+ key.get());
+ String valueString =
+ new String(value.getBytes(), 0, value.getLength());
+ assertEquals("Checking record length:", recordLength,
+ value.getLength());
+ assertTrue("Checking for more records than expected:",
+ recordNumber < totalRecords);
+ String origRecord = recordList.get(recordNumber);
+ assertEquals("Checking record content:", origRecord, valueString);
+ recordNumber++;
+ }
+ reader.close();
+ }
+ assertEquals("Total original records should be total read records:",
+ recordList.size(), recordNumber);
+ }
+ }
+
+ private static void writeFile(FileSystem fs, Path name,
+ CompressionCodec codec,
+ String contents) throws IOException {
+ OutputStream stm;
+ if (codec == null) {
+ stm = fs.create(name);
+ } else {
+ stm = codec.createOutputStream(fs.create(name));
+ }
+ stm.write(contents.getBytes());
+ stm.close();
+ }
+
+ private static List readSplit(FixedLengthInputFormat format,
+ InputSplit split,
+ JobConf job) throws IOException {
+ List result = new ArrayList();
+ RecordReader reader =
+ format.getRecordReader(split, job, voidReporter);
+ LongWritable key = reader.createKey();
+ BytesWritable value = reader.createValue();
+ while (reader.next(key, value)) {
+ result.add(new String(value.getBytes(), 0, value.getLength()));
+ }
+ reader.close();
+ return result;
+ }
+
+ private void runPartialRecordTest(CompressionCodec codec) throws IOException {
+ localFs.delete(workDir, true);
+ // Create a file with fixed length records with 5 byte long
+ // records with a partial record at the end.
+ StringBuilder fileName = new StringBuilder("testFormat.txt");
+ if (codec != null) {
+ fileName.append(".gz");
+ }
+ writeFile(localFs, new Path(workDir, fileName.toString()), codec,
+ "one two threefour five six seveneightnine ten");
+ FixedLengthInputFormat format = new FixedLengthInputFormat();
+ format.setRecordLength(defaultConf, 5);
+ JobConf job = new JobConf(defaultConf);
+ FileInputFormat.setInputPaths(job, workDir);
+ if (codec != null) {
+ ReflectionUtils.setConf(codec, job);
+ }
+ format.configure(job);
+ InputSplit[] splits = format.getSplits(job, 100);
+ if (codec != null) {
+ assertEquals("compressed splits == 1", 1, splits.length);
+ }
+ boolean exceptionThrown = false;
+ for (InputSplit split : splits) {
+ try {
+ List results = readSplit(format, split, job);
+ } catch(IOException ioe) {
+ exceptionThrown = true;
+ LOG.info("Exception message:" + ioe.getMessage());
+ }
+ }
+ assertTrue("Exception for partial record:", exceptionThrown);
+ }
+
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java
new file mode 100644
index 00000000000..0c94a4d34ce
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static junit.framework.Assert.*;
+
+public class TestFixedLengthInputFormat {
+
+ private static Log LOG;
+ private static Configuration defaultConf;
+ private static FileSystem localFs;
+ private static Path workDir;
+
+ // some chars for the record data
+ private static char[] chars;
+ private static Random charRand;
+
+ @BeforeClass
+ public static void onlyOnce() {
+ try {
+ LOG = LogFactory.getLog(TestFixedLengthInputFormat.class.getName());
+ defaultConf = new Configuration();
+ defaultConf.set("fs.defaultFS", "file:///");
+ localFs = FileSystem.getLocal(defaultConf);
+ // our set of chars
+ chars = ("abcdefghijklmnopqrstuvABCDEFGHIJKLMN OPQRSTUVWXYZ1234567890)"
+ + "(*&^%$#@!-=> splits = format.getSplits(job);
+ boolean exceptionThrown = false;
+ for (InputSplit split : splits) {
+ try {
+ TaskAttemptContext context = MapReduceTestUtil.
+ createDummyMapTaskAttemptContext(job.getConfiguration());
+ RecordReader reader =
+ format.createRecordReader(split, context);
+ MapContext
+ mcontext =
+ new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(),
+ reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
+ reader.initialize(split, mcontext);
+ } catch(IOException ioe) {
+ exceptionThrown = true;
+ LOG.info("Exception message:" + ioe.getMessage());
+ }
+ }
+ assertTrue("Exception for not setting record length:", exceptionThrown);
+ }
+
+ /**
+ * Test with record length set to 0
+ */
+ @Test (timeout=5000)
+ public void testZeroRecordLength() throws Exception {
+ localFs.delete(workDir, true);
+ Path file = new Path(workDir, new String("testFormat.txt"));
+ createFile(file, null, 10, 10);
+ // Set the fixed length record length config property
+ Configuration testConf = new Configuration(defaultConf);
+ FixedLengthInputFormat format = new FixedLengthInputFormat();
+ format.setRecordLength(testConf, 0);
+ Job job = Job.getInstance(testConf);
+ FileInputFormat.setInputPaths(job, workDir);
+ List splits = format.getSplits(job);
+ boolean exceptionThrown = false;
+ for (InputSplit split : splits) {
+ try {
+ TaskAttemptContext context =
+ MapReduceTestUtil.createDummyMapTaskAttemptContext(
+ job.getConfiguration());
+ RecordReader reader =
+ format.createRecordReader(split, context);
+ MapContext
+ mcontext =
+ new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(),
+ reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
+ reader.initialize(split, mcontext);
+ } catch(IOException ioe) {
+ exceptionThrown = true;
+ LOG.info("Exception message:" + ioe.getMessage());
+ }
+ }
+ assertTrue("Exception for zero record length:", exceptionThrown);
+ }
+
+ /**
+ * Test with record length set to a negative value
+ */
+ @Test (timeout=5000)
+ public void testNegativeRecordLength() throws Exception {
+ localFs.delete(workDir, true);
+ Path file = new Path(workDir, new String("testFormat.txt"));
+ createFile(file, null, 10, 10);
+ // Set the fixed length record length config property
+ Configuration testConf = new Configuration(defaultConf);
+ FixedLengthInputFormat format = new FixedLengthInputFormat();
+ format.setRecordLength(testConf, -10);
+ Job job = Job.getInstance(testConf);
+ FileInputFormat.setInputPaths(job, workDir);
+ List splits = format.getSplits(job);
+ boolean exceptionThrown = false;
+ for (InputSplit split : splits) {
+ try {
+ TaskAttemptContext context = MapReduceTestUtil.
+ createDummyMapTaskAttemptContext(job.getConfiguration());
+ RecordReader reader =
+ format.createRecordReader(split, context);
+ MapContext
+ mcontext =
+ new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(),
+ reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
+ reader.initialize(split, mcontext);
+ } catch(IOException ioe) {
+ exceptionThrown = true;
+ LOG.info("Exception message:" + ioe.getMessage());
+ }
+ }
+ assertTrue("Exception for negative record length:", exceptionThrown);
+ }
+
+ /**
+ * Test with partial record at the end of a compressed input file.
+ */
+ @Test (timeout=5000)
+ public void testPartialRecordCompressedIn() throws Exception {
+ CompressionCodec gzip = new GzipCodec();
+ runPartialRecordTest(gzip);
+ }
+
+ /**
+ * Test with partial record at the end of an uncompressed input file.
+ */
+ @Test (timeout=5000)
+ public void testPartialRecordUncompressedIn() throws Exception {
+ runPartialRecordTest(null);
+ }
+
+ /**
+ * Test using the gzip codec with two input files.
+ */
+ @Test (timeout=5000)
+ public void testGzipWithTwoInputs() throws Exception {
+ CompressionCodec gzip = new GzipCodec();
+ localFs.delete(workDir, true);
+ // Create files with fixed length records with 5 byte long records.
+ writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip,
+ "one two threefour five six seveneightnine ten ");
+ writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
+ "ten nine eightsevensix five four threetwo one ");
+ FixedLengthInputFormat format = new FixedLengthInputFormat();
+ format.setRecordLength(defaultConf, 5);
+ ReflectionUtils.setConf(gzip, defaultConf);
+ Job job = Job.getInstance(defaultConf);
+ FileInputFormat.setInputPaths(job, workDir);
+ List splits = format.getSplits(job);
+ assertEquals("compressed splits == 2", 2, splits.size());
+ FileSplit tmp = (FileSplit) splits.get(0);
+ if (tmp.getPath().getName().equals("part2.txt.gz")) {
+ splits.set(0, splits.get(1));
+ splits.set(1, tmp);
+ }
+ List results = readSplit(format, splits.get(0), job);
+ assertEquals("splits[0] length", 10, results.size());
+ assertEquals("splits[0][5]", "six ", results.get(5));
+ results = readSplit(format, splits.get(1), job);
+ assertEquals("splits[1] length", 10, results.size());
+ assertEquals("splits[1][0]", "ten ", results.get(0));
+ assertEquals("splits[1][1]", "nine ", results.get(1));
+ }
+
+ // Create a file containing fixed length records with random data
+ private ArrayList createFile(Path targetFile, CompressionCodec codec,
+ int recordLen,
+ int numRecords) throws IOException {
+ ArrayList recordList = new ArrayList(numRecords);
+ OutputStream ostream = localFs.create(targetFile);
+ if (codec != null) {
+ ostream = codec.createOutputStream(ostream);
+ }
+ Writer writer = new OutputStreamWriter(ostream);
+ try {
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < numRecords; i++) {
+ for (int j = 0; j < recordLen; j++) {
+ sb.append(chars[charRand.nextInt(chars.length)]);
+ }
+ String recordData = sb.toString();
+ recordList.add(recordData);
+ writer.write(recordData);
+ sb.setLength(0);
+ }
+ } finally {
+ writer.close();
+ }
+ return recordList;
+ }
+
+ private void runRandomTests(CompressionCodec codec) throws Exception {
+ StringBuilder fileName = new StringBuilder("testFormat.txt");
+ if (codec != null) {
+ fileName.append(".gz");
+ }
+ localFs.delete(workDir, true);
+ Path file = new Path(workDir, fileName.toString());
+ int seed = new Random().nextInt();
+ LOG.info("Seed = " + seed);
+ Random random = new Random(seed);
+ int MAX_TESTS = 20;
+ LongWritable key;
+ BytesWritable value;
+
+ for (int i = 0; i < MAX_TESTS; i++) {
+ LOG.info("----------------------------------------------------------");
+ // Maximum total records of 999
+ int totalRecords = random.nextInt(999)+1;
+ // Test an empty file
+ if (i == 8) {
+ totalRecords = 0;
+ }
+ // Maximum bytes in a record of 100K
+ int recordLength = random.nextInt(1024*100)+1;
+ // For the 11th test, force a record length of 1
+ if (i == 10) {
+ recordLength = 1;
+ }
+ // The total bytes in the test file
+ int fileSize = (totalRecords * recordLength);
+ LOG.info("totalRecords=" + totalRecords + " recordLength="
+ + recordLength);
+ // Create the test file
+ ArrayList recordList =
+ createFile(file, codec, recordLength, totalRecords);
+ assertTrue(localFs.exists(file));
+ // Set the fixed length record length config property
+ Configuration testConf = new Configuration(defaultConf);
+ FixedLengthInputFormat.setRecordLength(testConf, recordLength);
+
+ int numSplits = 1;
+ // Arbitrarily set number of splits.
+ if (i > 0) {
+ if (i == (MAX_TESTS-1)) {
+ // Test a split size that is less than record len
+ numSplits = (int)(fileSize/Math.floor(recordLength/2));
+ } else {
+ if (MAX_TESTS % i == 0) {
+ // Let us create a split size that is forced to be
+ // smaller than the end file itself, (ensures 1+ splits)
+ numSplits = fileSize/(fileSize - random.nextInt(fileSize));
+ } else {
+ // Just pick a random split size with no upper bound
+ numSplits = Math.max(1, fileSize/random.nextInt(Integer.MAX_VALUE));
+ }
+ }
+ LOG.info("Number of splits set to: " + numSplits);
+ }
+ testConf.setLong("mapreduce.input.fileinputformat.split.maxsize",
+ (long)(fileSize/numSplits));
+
+ // Create the job, and setup the input path
+ Job job = Job.getInstance(testConf);
+ FileInputFormat.setInputPaths(job, workDir);
+ // Try splitting the file in a variety of sizes
+ FixedLengthInputFormat format = new FixedLengthInputFormat();
+ List splits = format.getSplits(job);
+ LOG.info("Actual number of splits = " + splits.size());
+ // Test combined split lengths = total file size
+ long recordOffset = 0;
+ int recordNumber = 0;
+ for (InputSplit split : splits) {
+ TaskAttemptContext context = MapReduceTestUtil.
+ createDummyMapTaskAttemptContext(job.getConfiguration());
+ RecordReader reader =
+ format.createRecordReader(split, context);
+ MapContext
+ mcontext =
+ new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(),
+ reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
+ reader.initialize(split, mcontext);
+ Class> clazz = reader.getClass();
+ assertEquals("RecordReader class should be FixedLengthRecordReader:",
+ FixedLengthRecordReader.class, clazz);
+ // Plow through the records in this split
+ while (reader.nextKeyValue()) {
+ key = reader.getCurrentKey();
+ value = reader.getCurrentValue();
+ assertEquals("Checking key", (long)(recordNumber*recordLength),
+ key.get());
+ String valueString = new String(value.getBytes(), 0,
+ value.getLength());
+ assertEquals("Checking record length:", recordLength,
+ value.getLength());
+ assertTrue("Checking for more records than expected:",
+ recordNumber < totalRecords);
+ String origRecord = recordList.get(recordNumber);
+ assertEquals("Checking record content:", origRecord, valueString);
+ recordNumber++;
+ }
+ reader.close();
+ }
+ assertEquals("Total original records should be total read records:",
+ recordList.size(), recordNumber);
+ }
+ }
+
+ private static void writeFile(FileSystem fs, Path name,
+ CompressionCodec codec,
+ String contents) throws IOException {
+ OutputStream stm;
+ if (codec == null) {
+ stm = fs.create(name);
+ } else {
+ stm = codec.createOutputStream(fs.create(name));
+ }
+ stm.write(contents.getBytes());
+ stm.close();
+ }
+
+ private static List readSplit(FixedLengthInputFormat format,
+ InputSplit split,
+ Job job) throws Exception {
+ List result = new ArrayList();
+ TaskAttemptContext context = MapReduceTestUtil.
+ createDummyMapTaskAttemptContext(job.getConfiguration());
+ RecordReader reader =
+ format.createRecordReader(split, context);
+ MapContext
+ mcontext =
+ new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(),
+ reader, null, null, MapReduceTestUtil.createDummyReporter(), split);
+ reader.initialize(split, mcontext);
+ LongWritable key;
+ BytesWritable value;
+ while (reader.nextKeyValue()) {
+ key = reader.getCurrentKey();
+ value = reader.getCurrentValue();
+ result.add(new String(value.getBytes(), 0, value.getLength()));
+ }
+ reader.close();
+ return result;
+ }
+
+ private void runPartialRecordTest(CompressionCodec codec) throws Exception {
+ localFs.delete(workDir, true);
+ // Create a file with fixed length records with 5 byte long
+ // records with a partial record at the end.
+ StringBuilder fileName = new StringBuilder("testFormat.txt");
+ if (codec != null) {
+ fileName.append(".gz");
+ ReflectionUtils.setConf(codec, defaultConf);
+ }
+ writeFile(localFs, new Path(workDir, fileName.toString()), codec,
+ "one two threefour five six seveneightnine ten");
+ FixedLengthInputFormat format = new FixedLengthInputFormat();
+ format.setRecordLength(defaultConf, 5);
+ Job job = Job.getInstance(defaultConf);
+ FileInputFormat.setInputPaths(job, workDir);
+ List splits = format.getSplits(job);
+ if (codec != null) {
+ assertEquals("compressed splits == 1", 1, splits.size());
+ }
+ boolean exceptionThrown = false;
+ for (InputSplit split : splits) {
+ try {
+ List results = readSplit(format, split, job);
+ } catch(IOException ioe) {
+ exceptionThrown = true;
+ LOG.info("Exception message:" + ioe.getMessage());
+ }
+ }
+ assertTrue("Exception for partial record:", exceptionThrown);
+ }
+
+}