From 7b819a313e4e7c1d6f5111c45c4cc32227d627ff Mon Sep 17 00:00:00 2001 From: Sanford Ryza Date: Tue, 12 Nov 2013 03:11:19 +0000 Subject: [PATCH] MAPREDUCE-1176. FixedLengthInputFormat and FixedLengthRecordReader (Mariappan Asokan and BitsOfInfo via Sandy Ryza) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1540932 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapred/FixedLengthInputFormat.java | 97 ++++ .../mapred/FixedLengthRecordReader.java | 89 ++++ .../lib/input/FixedLengthInputFormat.java | 90 ++++ .../lib/input/FixedLengthRecordReader.java | 220 +++++++++ .../mapred/TestFixedLengthInputFormat.java | 416 ++++++++++++++++ .../lib/input/TestFixedLengthInputFormat.java | 461 ++++++++++++++++++ 7 files changed, 1376 insertions(+) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index b30c0852e51..ed0ac5adbf5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -27,6 +27,9 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-4421. Run MapReduce framework via the distributed cache (jlowe) + MAPREDUCE-1176. FixedLengthInputFormat and FixedLengthRecordReader + (Mariappan Asokan and BitsOfInfo via Sandy Ryza) + OPTIMIZATIONS MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java new file mode 100644 index 00000000000..60ba69c5db8 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthInputFormat.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; + +/** + * FixedLengthInputFormat is an input format used to read input files + * which contain fixed length records. The content of a record need not be + * text. It can be arbitrary binary data. Users must configure the record + * length property by calling: + * FixedLengthInputFormat.setRecordLength(conf, recordLength);

or + * conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, recordLength); + *

+ * @see FixedLengthRecordReader + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class FixedLengthInputFormat + extends FileInputFormat + implements JobConfigurable { + + private CompressionCodecFactory compressionCodecs = null; + + public static final String FIXED_RECORD_LENGTH = + "fixedlengthinputformat.record.length"; + + /** + * Set the length of each record + * @param conf configuration + * @param recordLength the length of a record + */ + public static void setRecordLength(Configuration conf, int recordLength) { + conf.setInt(FIXED_RECORD_LENGTH, recordLength); + } + + /** + * Get record length value + * @param conf configuration + * @return the record length, zero means none was set + */ + public static int getRecordLength(Configuration conf) { + return conf.getInt(FIXED_RECORD_LENGTH, 0); + } + + @Override + public void configure(JobConf conf) { + compressionCodecs = new CompressionCodecFactory(conf); + } + + @Override + public RecordReader + getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter) + throws IOException { + reporter.setStatus(genericSplit.toString()); + int recordLength = getRecordLength(job); + if (recordLength <= 0) { + throw new IOException("Fixed record length " + recordLength + + " is invalid. It should be set to a value greater than zero"); + } + return new FixedLengthRecordReader(job, (FileSplit)genericSplit, + recordLength); + } + + @Override + protected boolean isSplitable(FileSystem fs, Path file) { + final CompressionCodec codec = compressionCodecs.getCodec(file); + return(null == codec); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java new file mode 100644 index 00000000000..6d85fda5c30 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FixedLengthRecordReader.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; + +/** + * A reader to read fixed length records from a split. Record offset is + * returned as key and the record as bytes is returned in value. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class FixedLengthRecordReader + implements RecordReader { + + private int recordLength; + // Make use of the new API implementation to avoid code duplication. + private org.apache.hadoop.mapreduce.lib.input.FixedLengthRecordReader reader; + + public FixedLengthRecordReader(Configuration job, FileSplit split, + int recordLength) throws IOException { + this.recordLength = recordLength; + reader = new org.apache.hadoop.mapreduce.lib.input.FixedLengthRecordReader( + recordLength); + reader.initialize(job, split.getStart(), split.getLength(), + split.getPath()); + } + + @Override + public LongWritable createKey() { + return new LongWritable(); + } + + @Override + public BytesWritable createValue() { + return new BytesWritable(new byte[recordLength]); + } + + @Override + public synchronized boolean next(LongWritable key, BytesWritable value) + throws IOException { + boolean dataRead = reader.nextKeyValue(); + if (dataRead) { + LongWritable newKey = reader.getCurrentKey(); + BytesWritable newValue = reader.getCurrentValue(); + key.set(newKey.get()); + value.set(newValue); + } + return dataRead; + } + + @Override + public float getProgress() throws IOException { + return reader.getProgress(); + } + + @Override + public synchronized long getPos() throws IOException { + return reader.getPos(); + } + + @Override + public void close() throws IOException { + reader.close(); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java new file mode 100644 index 00000000000..9c32e83c5c3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthInputFormat.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * FixedLengthInputFormat is an input format used to read input files + * which contain fixed length records. The content of a record need not be + * text. It can be arbitrary binary data. Users must configure the record + * length property by calling: + * FixedLengthInputFormat.setRecordLength(conf, recordLength);

or + * conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, recordLength); + *

+ * @see FixedLengthRecordReader + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class FixedLengthInputFormat + extends FileInputFormat { + + public static final String FIXED_RECORD_LENGTH = + "fixedlengthinputformat.record.length"; + + /** + * Set the length of each record + * @param conf configuration + * @param recordLength the length of a record + */ + public static void setRecordLength(Configuration conf, int recordLength) { + conf.setInt(FIXED_RECORD_LENGTH, recordLength); + } + + /** + * Get record length value + * @param conf configuration + * @return the record length, zero means none was set + */ + public static int getRecordLength(Configuration conf) { + return conf.getInt(FIXED_RECORD_LENGTH, 0); + } + + @Override + public RecordReader + createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + int recordLength = getRecordLength(context.getConfiguration()); + if (recordLength <= 0) { + throw new IOException("Fixed record length " + recordLength + + " is invalid. It should be set to a value greater than zero"); + } + return new FixedLengthRecordReader(recordLength); + } + + @Override + protected boolean isSplitable(JobContext context, Path file) { + final CompressionCodec codec = + new CompressionCodecFactory(context.getConfiguration()).getCodec(file); + return (null == codec); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java new file mode 100644 index 00000000000..70fa7b2aec3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FixedLengthRecordReader.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; + +/** + * A reader to read fixed length records from a split. Record offset is + * returned as key and the record as bytes is returned in value. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class FixedLengthRecordReader + extends RecordReader { + private static final Log LOG + = LogFactory.getLog(FixedLengthRecordReader.class); + + private int recordLength; + private long start; + private long pos; + private long end; + private long numRecordsRemainingInSplit; + private FSDataInputStream fileIn; + private Seekable filePosition; + private LongWritable key; + private BytesWritable value; + private boolean isCompressedInput; + private Decompressor decompressor; + private InputStream inputStream; + + public FixedLengthRecordReader(int recordLength) { + this.recordLength = recordLength; + } + + @Override + public void initialize(InputSplit genericSplit, + TaskAttemptContext context) throws IOException { + FileSplit split = (FileSplit) genericSplit; + Configuration job = context.getConfiguration(); + final Path file = split.getPath(); + initialize(job, split.getStart(), split.getLength(), file); + } + + // This is also called from the old FixedLengthRecordReader API implementation + public void initialize(Configuration job, long splitStart, long splitLength, + Path file) throws IOException { + start = splitStart; + end = start + splitLength; + long partialRecordLength = start % recordLength; + long numBytesToSkip = 0; + if (partialRecordLength != 0) { + numBytesToSkip = recordLength - partialRecordLength; + } + + // open the file and seek to the start of the split + final FileSystem fs = file.getFileSystem(job); + fileIn = fs.open(file); + + CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); + if (null != codec) { + isCompressedInput = true; + decompressor = CodecPool.getDecompressor(codec); + CompressionInputStream cIn + = codec.createInputStream(fileIn, decompressor); + filePosition = cIn; + inputStream = cIn; + numRecordsRemainingInSplit = Long.MAX_VALUE; + LOG.info( + "Compressed input; cannot compute number of records in the split"); + } else { + fileIn.seek(start); + filePosition = fileIn; + inputStream = fileIn; + long splitSize = end - start - numBytesToSkip; + numRecordsRemainingInSplit = (splitSize + recordLength - 1)/recordLength; + if (numRecordsRemainingInSplit < 0) { + numRecordsRemainingInSplit = 0; + } + LOG.info("Expecting " + numRecordsRemainingInSplit + + " records each with a length of " + recordLength + + " bytes in the split with an effective size of " + + splitSize + " bytes"); + } + if (numBytesToSkip != 0) { + start += inputStream.skip(numBytesToSkip); + } + this.pos = start; + } + + @Override + public synchronized boolean nextKeyValue() throws IOException { + if (key == null) { + key = new LongWritable(); + } + if (value == null) { + value = new BytesWritable(new byte[recordLength]); + } + boolean dataRead = false; + value.setSize(recordLength); + byte[] record = value.getBytes(); + if (numRecordsRemainingInSplit > 0) { + key.set(pos); + int offset = 0; + int numBytesToRead = recordLength; + int numBytesRead = 0; + while (numBytesToRead > 0) { + numBytesRead = inputStream.read(record, offset, numBytesToRead); + if (numBytesRead == -1) { + // EOF + break; + } + offset += numBytesRead; + numBytesToRead -= numBytesRead; + } + numBytesRead = recordLength - numBytesToRead; + pos += numBytesRead; + if (numBytesRead > 0) { + dataRead = true; + if (numBytesRead >= recordLength) { + if (!isCompressedInput) { + numRecordsRemainingInSplit--; + } + } else { + throw new IOException("Partial record(length = " + numBytesRead + + ") found at the end of split."); + } + } else { + numRecordsRemainingInSplit = 0L; // End of input. + } + } + return dataRead; + } + + @Override + public LongWritable getCurrentKey() { + return key; + } + + @Override + public BytesWritable getCurrentValue() { + return value; + } + + @Override + public synchronized float getProgress() throws IOException { + if (start == end) { + return 0.0f; + } else { + return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start)); + } + } + + @Override + public synchronized void close() throws IOException { + try { + if (inputStream != null) { + inputStream.close(); + inputStream = null; + } + } finally { + if (decompressor != null) { + CodecPool.returnDecompressor(decompressor); + decompressor = null; + } + } + } + + // This is called from the old FixedLengthRecordReader API implementation. + public long getPos() { + return pos; + } + + private long getFilePosition() throws IOException { + long retVal; + if (isCompressedInput && null != filePosition) { + retVal = filePosition.getPos(); + } else { + retVal = pos; + } + return retVal; + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java new file mode 100644 index 00000000000..0c614eaf0c7 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java @@ -0,0 +1,416 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.util.ReflectionUtils; + +import org.junit.BeforeClass; +import org.junit.Test; +import static junit.framework.Assert.*; + +public class TestFixedLengthInputFormat { + + private static Log LOG; + private static Configuration defaultConf; + private static FileSystem localFs; + private static Path workDir; + private static Reporter voidReporter; + + // some chars for the record data + private static char[] chars; + private static Random charRand; + + @BeforeClass + public static void onlyOnce() { + try { + LOG = LogFactory.getLog(TestFixedLengthInputFormat.class.getName()); + defaultConf = new Configuration(); + defaultConf.set("fs.defaultFS", "file:///"); + localFs = FileSystem.getLocal(defaultConf); + voidReporter = Reporter.NULL; + // our set of chars + chars = ("abcdefghijklmnopqrstuvABCDEFGHIJKLMN OPQRSTUVWXYZ1234567890)" + + "(*&^%$#@!-=> reader = + format.getRecordReader(split, job, voidReporter); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for not setting record length:", exceptionThrown); + } + + /** + * Test with record length set to 0 + */ + @Test (timeout=5000) + public void testZeroRecordLength() throws IOException { + localFs.delete(workDir, true); + Path file = new Path(workDir, new String("testFormat.txt")); + createFile(file, null, 10, 10); + // Set the fixed length record length config property + Configuration testConf = new Configuration(defaultConf); + JobConf job = new JobConf(testConf); + FileInputFormat.setInputPaths(job, workDir); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(job, 0); + format.configure(job); + InputSplit splits[] = format.getSplits(job, 1); + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + RecordReader reader = + format.getRecordReader(split, job, voidReporter); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for zero record length:", exceptionThrown); + } + + /** + * Test with record length set to a negative value + */ + @Test (timeout=5000) + public void testNegativeRecordLength() throws IOException { + localFs.delete(workDir, true); + Path file = new Path(workDir, new String("testFormat.txt")); + createFile(file, null, 10, 10); + // Set the fixed length record length config property + Configuration testConf = new Configuration(defaultConf); + JobConf job = new JobConf(testConf); + FileInputFormat.setInputPaths(job, workDir); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(job, -10); + format.configure(job); + InputSplit splits[] = format.getSplits(job, 1); + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + RecordReader reader = + format.getRecordReader(split, job, voidReporter); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for negative record length:", exceptionThrown); + } + + /** + * Test with partial record at the end of a compressed input file. + */ + @Test (timeout=5000) + public void testPartialRecordCompressedIn() throws IOException { + CompressionCodec gzip = new GzipCodec(); + runPartialRecordTest(gzip); + } + + /** + * Test with partial record at the end of an uncompressed input file. + */ + @Test (timeout=5000) + public void testPartialRecordUncompressedIn() throws IOException { + runPartialRecordTest(null); + } + + /** + * Test using the gzip codec with two input files. + */ + @Test (timeout=5000) + public void testGzipWithTwoInputs() throws IOException { + CompressionCodec gzip = new GzipCodec(); + localFs.delete(workDir, true); + // Create files with fixed length records with 5 byte long records. + writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, + "one two threefour five six seveneightnine ten "); + writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip, + "ten nine eightsevensix five four threetwo one "); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(defaultConf, 5); + JobConf job = new JobConf(defaultConf); + FileInputFormat.setInputPaths(job, workDir); + ReflectionUtils.setConf(gzip, job); + format.configure(job); + InputSplit[] splits = format.getSplits(job, 100); + assertEquals("compressed splits == 2", 2, splits.length); + FileSplit tmp = (FileSplit) splits[0]; + if (tmp.getPath().getName().equals("part2.txt.gz")) { + splits[0] = splits[1]; + splits[1] = tmp; + } + List results = readSplit(format, splits[0], job); + assertEquals("splits[0] length", 10, results.size()); + assertEquals("splits[0][5]", "six ", results.get(5)); + results = readSplit(format, splits[1], job); + assertEquals("splits[1] length", 10, results.size()); + assertEquals("splits[1][0]", "ten ", results.get(0)); + assertEquals("splits[1][1]", "nine ", results.get(1)); + } + + // Create a file containing fixed length records with random data + private ArrayList createFile(Path targetFile, CompressionCodec codec, + int recordLen, + int numRecords) throws IOException { + ArrayList recordList = new ArrayList(numRecords); + OutputStream ostream = localFs.create(targetFile); + if (codec != null) { + ostream = codec.createOutputStream(ostream); + } + Writer writer = new OutputStreamWriter(ostream); + try { + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < numRecords; i++) { + for (int j = 0; j < recordLen; j++) { + sb.append(chars[charRand.nextInt(chars.length)]); + } + String recordData = sb.toString(); + recordList.add(recordData); + writer.write(recordData); + sb.setLength(0); + } + } finally { + writer.close(); + } + return recordList; + } + + private void runRandomTests(CompressionCodec codec) throws IOException { + StringBuilder fileName = new StringBuilder("testFormat.txt"); + if (codec != null) { + fileName.append(".gz"); + } + localFs.delete(workDir, true); + Path file = new Path(workDir, fileName.toString()); + int seed = new Random().nextInt(); + LOG.info("Seed = " + seed); + Random random = new Random(seed); + int MAX_TESTS = 20; + LongWritable key = new LongWritable(); + BytesWritable value = new BytesWritable(); + + for (int i = 0; i < MAX_TESTS; i++) { + LOG.info("----------------------------------------------------------"); + // Maximum total records of 999 + int totalRecords = random.nextInt(999)+1; + // Test an empty file + if (i == 8) { + totalRecords = 0; + } + // Maximum bytes in a record of 100K + int recordLength = random.nextInt(1024*100)+1; + // For the 11th test, force a record length of 1 + if (i == 10) { + recordLength = 1; + } + // The total bytes in the test file + int fileSize = (totalRecords * recordLength); + LOG.info("totalRecords=" + totalRecords + " recordLength=" + + recordLength); + // Create the test file + ArrayList recordList + = createFile(file, codec, recordLength, totalRecords); + assertTrue(localFs.exists(file)); + // Set the fixed length record length config property + Configuration testConf = new Configuration(defaultConf); + FixedLengthInputFormat.setRecordLength(testConf, recordLength); + + int numSplits = 1; + // Arbitrarily set number of splits. + if (i > 0) { + if (i == (MAX_TESTS-1)) { + // Test a split size that is less than record len + numSplits = (int)(fileSize/Math.floor(recordLength/2)); + } else { + if (MAX_TESTS % i == 0) { + // Let us create a split size that is forced to be + // smaller than the end file itself, (ensures 1+ splits) + numSplits = fileSize/(fileSize - random.nextInt(fileSize)); + } else { + // Just pick a random split size with no upper bound + numSplits = Math.max(1, fileSize/random.nextInt(Integer.MAX_VALUE)); + } + } + LOG.info("Number of splits set to: " + numSplits); + } + + // Create the job, and setup the input path + JobConf job = new JobConf(testConf); + FileInputFormat.setInputPaths(job, workDir); + // Try splitting the file in a variety of sizes + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.configure(job); + InputSplit splits[] = format.getSplits(job, numSplits); + LOG.info("Actual number of splits = " + splits.length); + // Test combined split lengths = total file size + long recordOffset = 0; + int recordNumber = 0; + for (InputSplit split : splits) { + RecordReader reader = + format.getRecordReader(split, job, voidReporter); + Class clazz = reader.getClass(); + assertEquals("RecordReader class should be FixedLengthRecordReader:", + FixedLengthRecordReader.class, clazz); + // Plow through the records in this split + while (reader.next(key, value)) { + assertEquals("Checking key", (long)(recordNumber*recordLength), + key.get()); + String valueString = + new String(value.getBytes(), 0, value.getLength()); + assertEquals("Checking record length:", recordLength, + value.getLength()); + assertTrue("Checking for more records than expected:", + recordNumber < totalRecords); + String origRecord = recordList.get(recordNumber); + assertEquals("Checking record content:", origRecord, valueString); + recordNumber++; + } + reader.close(); + } + assertEquals("Total original records should be total read records:", + recordList.size(), recordNumber); + } + } + + private static void writeFile(FileSystem fs, Path name, + CompressionCodec codec, + String contents) throws IOException { + OutputStream stm; + if (codec == null) { + stm = fs.create(name); + } else { + stm = codec.createOutputStream(fs.create(name)); + } + stm.write(contents.getBytes()); + stm.close(); + } + + private static List readSplit(FixedLengthInputFormat format, + InputSplit split, + JobConf job) throws IOException { + List result = new ArrayList(); + RecordReader reader = + format.getRecordReader(split, job, voidReporter); + LongWritable key = reader.createKey(); + BytesWritable value = reader.createValue(); + while (reader.next(key, value)) { + result.add(new String(value.getBytes(), 0, value.getLength())); + } + reader.close(); + return result; + } + + private void runPartialRecordTest(CompressionCodec codec) throws IOException { + localFs.delete(workDir, true); + // Create a file with fixed length records with 5 byte long + // records with a partial record at the end. + StringBuilder fileName = new StringBuilder("testFormat.txt"); + if (codec != null) { + fileName.append(".gz"); + } + writeFile(localFs, new Path(workDir, fileName.toString()), codec, + "one two threefour five six seveneightnine ten"); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(defaultConf, 5); + JobConf job = new JobConf(defaultConf); + FileInputFormat.setInputPaths(job, workDir); + if (codec != null) { + ReflectionUtils.setConf(codec, job); + } + format.configure(job); + InputSplit[] splits = format.getSplits(job, 100); + if (codec != null) { + assertEquals("compressed splits == 1", 1, splits.length); + } + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + List results = readSplit(format, split, job); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for partial record:", exceptionThrown); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java new file mode 100644 index 00000000000..0c94a4d34ce --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java @@ -0,0 +1,461 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.input; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.MapReduceTestUtil; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.task.MapContextImpl; + +import org.apache.hadoop.util.ReflectionUtils; + +import org.junit.BeforeClass; +import org.junit.Test; +import static junit.framework.Assert.*; + +public class TestFixedLengthInputFormat { + + private static Log LOG; + private static Configuration defaultConf; + private static FileSystem localFs; + private static Path workDir; + + // some chars for the record data + private static char[] chars; + private static Random charRand; + + @BeforeClass + public static void onlyOnce() { + try { + LOG = LogFactory.getLog(TestFixedLengthInputFormat.class.getName()); + defaultConf = new Configuration(); + defaultConf.set("fs.defaultFS", "file:///"); + localFs = FileSystem.getLocal(defaultConf); + // our set of chars + chars = ("abcdefghijklmnopqrstuvABCDEFGHIJKLMN OPQRSTUVWXYZ1234567890)" + + "(*&^%$#@!-=> splits = format.getSplits(job); + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + TaskAttemptContext context = MapReduceTestUtil. + createDummyMapTaskAttemptContext(job.getConfiguration()); + RecordReader reader = + format.createRecordReader(split, context); + MapContext + mcontext = + new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(), + reader, null, null, MapReduceTestUtil.createDummyReporter(), split); + reader.initialize(split, mcontext); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for not setting record length:", exceptionThrown); + } + + /** + * Test with record length set to 0 + */ + @Test (timeout=5000) + public void testZeroRecordLength() throws Exception { + localFs.delete(workDir, true); + Path file = new Path(workDir, new String("testFormat.txt")); + createFile(file, null, 10, 10); + // Set the fixed length record length config property + Configuration testConf = new Configuration(defaultConf); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(testConf, 0); + Job job = Job.getInstance(testConf); + FileInputFormat.setInputPaths(job, workDir); + List splits = format.getSplits(job); + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + TaskAttemptContext context = + MapReduceTestUtil.createDummyMapTaskAttemptContext( + job.getConfiguration()); + RecordReader reader = + format.createRecordReader(split, context); + MapContext + mcontext = + new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(), + reader, null, null, MapReduceTestUtil.createDummyReporter(), split); + reader.initialize(split, mcontext); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for zero record length:", exceptionThrown); + } + + /** + * Test with record length set to a negative value + */ + @Test (timeout=5000) + public void testNegativeRecordLength() throws Exception { + localFs.delete(workDir, true); + Path file = new Path(workDir, new String("testFormat.txt")); + createFile(file, null, 10, 10); + // Set the fixed length record length config property + Configuration testConf = new Configuration(defaultConf); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(testConf, -10); + Job job = Job.getInstance(testConf); + FileInputFormat.setInputPaths(job, workDir); + List splits = format.getSplits(job); + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + TaskAttemptContext context = MapReduceTestUtil. + createDummyMapTaskAttemptContext(job.getConfiguration()); + RecordReader reader = + format.createRecordReader(split, context); + MapContext + mcontext = + new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(), + reader, null, null, MapReduceTestUtil.createDummyReporter(), split); + reader.initialize(split, mcontext); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for negative record length:", exceptionThrown); + } + + /** + * Test with partial record at the end of a compressed input file. + */ + @Test (timeout=5000) + public void testPartialRecordCompressedIn() throws Exception { + CompressionCodec gzip = new GzipCodec(); + runPartialRecordTest(gzip); + } + + /** + * Test with partial record at the end of an uncompressed input file. + */ + @Test (timeout=5000) + public void testPartialRecordUncompressedIn() throws Exception { + runPartialRecordTest(null); + } + + /** + * Test using the gzip codec with two input files. + */ + @Test (timeout=5000) + public void testGzipWithTwoInputs() throws Exception { + CompressionCodec gzip = new GzipCodec(); + localFs.delete(workDir, true); + // Create files with fixed length records with 5 byte long records. + writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, + "one two threefour five six seveneightnine ten "); + writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip, + "ten nine eightsevensix five four threetwo one "); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(defaultConf, 5); + ReflectionUtils.setConf(gzip, defaultConf); + Job job = Job.getInstance(defaultConf); + FileInputFormat.setInputPaths(job, workDir); + List splits = format.getSplits(job); + assertEquals("compressed splits == 2", 2, splits.size()); + FileSplit tmp = (FileSplit) splits.get(0); + if (tmp.getPath().getName().equals("part2.txt.gz")) { + splits.set(0, splits.get(1)); + splits.set(1, tmp); + } + List results = readSplit(format, splits.get(0), job); + assertEquals("splits[0] length", 10, results.size()); + assertEquals("splits[0][5]", "six ", results.get(5)); + results = readSplit(format, splits.get(1), job); + assertEquals("splits[1] length", 10, results.size()); + assertEquals("splits[1][0]", "ten ", results.get(0)); + assertEquals("splits[1][1]", "nine ", results.get(1)); + } + + // Create a file containing fixed length records with random data + private ArrayList createFile(Path targetFile, CompressionCodec codec, + int recordLen, + int numRecords) throws IOException { + ArrayList recordList = new ArrayList(numRecords); + OutputStream ostream = localFs.create(targetFile); + if (codec != null) { + ostream = codec.createOutputStream(ostream); + } + Writer writer = new OutputStreamWriter(ostream); + try { + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < numRecords; i++) { + for (int j = 0; j < recordLen; j++) { + sb.append(chars[charRand.nextInt(chars.length)]); + } + String recordData = sb.toString(); + recordList.add(recordData); + writer.write(recordData); + sb.setLength(0); + } + } finally { + writer.close(); + } + return recordList; + } + + private void runRandomTests(CompressionCodec codec) throws Exception { + StringBuilder fileName = new StringBuilder("testFormat.txt"); + if (codec != null) { + fileName.append(".gz"); + } + localFs.delete(workDir, true); + Path file = new Path(workDir, fileName.toString()); + int seed = new Random().nextInt(); + LOG.info("Seed = " + seed); + Random random = new Random(seed); + int MAX_TESTS = 20; + LongWritable key; + BytesWritable value; + + for (int i = 0; i < MAX_TESTS; i++) { + LOG.info("----------------------------------------------------------"); + // Maximum total records of 999 + int totalRecords = random.nextInt(999)+1; + // Test an empty file + if (i == 8) { + totalRecords = 0; + } + // Maximum bytes in a record of 100K + int recordLength = random.nextInt(1024*100)+1; + // For the 11th test, force a record length of 1 + if (i == 10) { + recordLength = 1; + } + // The total bytes in the test file + int fileSize = (totalRecords * recordLength); + LOG.info("totalRecords=" + totalRecords + " recordLength=" + + recordLength); + // Create the test file + ArrayList recordList = + createFile(file, codec, recordLength, totalRecords); + assertTrue(localFs.exists(file)); + // Set the fixed length record length config property + Configuration testConf = new Configuration(defaultConf); + FixedLengthInputFormat.setRecordLength(testConf, recordLength); + + int numSplits = 1; + // Arbitrarily set number of splits. + if (i > 0) { + if (i == (MAX_TESTS-1)) { + // Test a split size that is less than record len + numSplits = (int)(fileSize/Math.floor(recordLength/2)); + } else { + if (MAX_TESTS % i == 0) { + // Let us create a split size that is forced to be + // smaller than the end file itself, (ensures 1+ splits) + numSplits = fileSize/(fileSize - random.nextInt(fileSize)); + } else { + // Just pick a random split size with no upper bound + numSplits = Math.max(1, fileSize/random.nextInt(Integer.MAX_VALUE)); + } + } + LOG.info("Number of splits set to: " + numSplits); + } + testConf.setLong("mapreduce.input.fileinputformat.split.maxsize", + (long)(fileSize/numSplits)); + + // Create the job, and setup the input path + Job job = Job.getInstance(testConf); + FileInputFormat.setInputPaths(job, workDir); + // Try splitting the file in a variety of sizes + FixedLengthInputFormat format = new FixedLengthInputFormat(); + List splits = format.getSplits(job); + LOG.info("Actual number of splits = " + splits.size()); + // Test combined split lengths = total file size + long recordOffset = 0; + int recordNumber = 0; + for (InputSplit split : splits) { + TaskAttemptContext context = MapReduceTestUtil. + createDummyMapTaskAttemptContext(job.getConfiguration()); + RecordReader reader = + format.createRecordReader(split, context); + MapContext + mcontext = + new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(), + reader, null, null, MapReduceTestUtil.createDummyReporter(), split); + reader.initialize(split, mcontext); + Class clazz = reader.getClass(); + assertEquals("RecordReader class should be FixedLengthRecordReader:", + FixedLengthRecordReader.class, clazz); + // Plow through the records in this split + while (reader.nextKeyValue()) { + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + assertEquals("Checking key", (long)(recordNumber*recordLength), + key.get()); + String valueString = new String(value.getBytes(), 0, + value.getLength()); + assertEquals("Checking record length:", recordLength, + value.getLength()); + assertTrue("Checking for more records than expected:", + recordNumber < totalRecords); + String origRecord = recordList.get(recordNumber); + assertEquals("Checking record content:", origRecord, valueString); + recordNumber++; + } + reader.close(); + } + assertEquals("Total original records should be total read records:", + recordList.size(), recordNumber); + } + } + + private static void writeFile(FileSystem fs, Path name, + CompressionCodec codec, + String contents) throws IOException { + OutputStream stm; + if (codec == null) { + stm = fs.create(name); + } else { + stm = codec.createOutputStream(fs.create(name)); + } + stm.write(contents.getBytes()); + stm.close(); + } + + private static List readSplit(FixedLengthInputFormat format, + InputSplit split, + Job job) throws Exception { + List result = new ArrayList(); + TaskAttemptContext context = MapReduceTestUtil. + createDummyMapTaskAttemptContext(job.getConfiguration()); + RecordReader reader = + format.createRecordReader(split, context); + MapContext + mcontext = + new MapContextImpl(job.getConfiguration(), context.getTaskAttemptID(), + reader, null, null, MapReduceTestUtil.createDummyReporter(), split); + reader.initialize(split, mcontext); + LongWritable key; + BytesWritable value; + while (reader.nextKeyValue()) { + key = reader.getCurrentKey(); + value = reader.getCurrentValue(); + result.add(new String(value.getBytes(), 0, value.getLength())); + } + reader.close(); + return result; + } + + private void runPartialRecordTest(CompressionCodec codec) throws Exception { + localFs.delete(workDir, true); + // Create a file with fixed length records with 5 byte long + // records with a partial record at the end. + StringBuilder fileName = new StringBuilder("testFormat.txt"); + if (codec != null) { + fileName.append(".gz"); + ReflectionUtils.setConf(codec, defaultConf); + } + writeFile(localFs, new Path(workDir, fileName.toString()), codec, + "one two threefour five six seveneightnine ten"); + FixedLengthInputFormat format = new FixedLengthInputFormat(); + format.setRecordLength(defaultConf, 5); + Job job = Job.getInstance(defaultConf); + FileInputFormat.setInputPaths(job, workDir); + List splits = format.getSplits(job); + if (codec != null) { + assertEquals("compressed splits == 1", 1, splits.size()); + } + boolean exceptionThrown = false; + for (InputSplit split : splits) { + try { + List results = readSplit(format, split, job); + } catch(IOException ioe) { + exceptionThrown = true; + LOG.info("Exception message:" + ioe.getMessage()); + } + } + assertTrue("Exception for partial record:", exceptionThrown); + } + +}