MAPREDUCE-5028. Fixed a bug in MapTask that was causing mappers to fail when a large value of io.sort.mb is set. Contributed by Karthik Kambatla.

svn merge --ignore-ancestry -c 1576170 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1576171 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-03-11 01:30:20 +00:00
parent cbe653f9a2
commit ad5c0e4613
8 changed files with 359 additions and 12 deletions

View File

@ -90,7 +90,10 @@ public byte[] getData() {
/** Returns the current position in the input. */
public int getPosition() { return buffer.getPosition(); }
/** Returns the length of the input. */
/**
* Returns the index one greater than the last valid character in the input
* stream buffer.
*/
public int getLength() { return buffer.getLength(); }
}

View File

@ -69,6 +69,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5780. SliveTest should use the specified path to get the
particular FileSystem instead of using the default FileSystem. (szetszwo)
MAPREDUCE-5028. Fixed a bug in MapTask that was causing mappers to fail
when a large value of io.sort.mb is set. (Karthik Kambatla via vinodkv)
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -1176,8 +1176,9 @@ private void setEquator(int pos) {
equator = pos;
// set index prior to first entry, aligned at meta boundary
final int aligned = pos - (pos % METASIZE);
kvindex =
((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
// Cast one of the operands to long to avoid integer overflow
kvindex = (int)
(((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
"(" + (kvindex * 4) + ")");
}
@ -1192,8 +1193,9 @@ private void resetSpill() {
bufstart = bufend = e;
final int aligned = e - (e % METASIZE);
// set start/end to point to first meta record
kvstart = kvend =
((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
// Cast one of the operands to long to avoid integer overflow
kvstart = kvend = (int)
(((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
(kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
}

View File

@ -141,7 +141,8 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
key = keyDeserializer.deserialize(key);
DataInputBuffer nextVal = input.getValue();
buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength());
buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
- nextVal.getPosition());
value = valueDeserializer.deserialize(value);
currentKeyLength = nextKey.getLength() - nextKey.getPosition();
@ -205,7 +206,8 @@ public VALUEIN next() {
if (backupStore.hasNext()) {
backupStore.next();
DataInputBuffer next = backupStore.nextValue();
buffer.reset(next.getData(), next.getPosition(), next.getLength());
buffer.reset(next.getData(), next.getPosition(), next.getLength()
- next.getPosition());
value = valueDeserializer.deserialize(value);
return value;
} else {

View File

@ -37,9 +37,9 @@
public class InMemoryReader<K, V> extends Reader<K, V> {
private final TaskAttemptID taskAttemptId;
private final MergeManagerImpl<K,V> merger;
DataInputBuffer memDataIn = new DataInputBuffer();
private int start;
private int length;
private final DataInputBuffer memDataIn = new DataInputBuffer();
private final int start;
private final int length;
public InMemoryReader(MergeManagerImpl<K,V> merger, TaskAttemptID taskAttemptId,
byte[] data, int start, int length, Configuration conf)
@ -50,14 +50,14 @@ public InMemoryReader(MergeManagerImpl<K,V> merger, TaskAttemptID taskAttemptId,
buffer = data;
bufferSize = (int)fileLength;
memDataIn.reset(buffer, start, length);
memDataIn.reset(buffer, start, length - start);
this.start = start;
this.length = length;
}
@Override
public void reset(int offset) {
memDataIn.reset(buffer, start + offset, length);
memDataIn.reset(buffer, start + offset, length - start - offset);
bytesRead = offset;
eof = false;
}

View File

@ -0,0 +1,269 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* A sample MR job that helps with testing large sorts in the MapReduce
* framework. Mapper generates the specified number of bytes and pipes them
* to the reducers.
*
* <code>mapreduce.large-sorter.mbs-per-map</code> specifies the amount
* of data (in MBs) to generate per map. By default, this is twice the value
* of <code>mapreduce.task.io.sort.mb</code> or 1 GB if that is not specified
* either.
* <code>mapreduce.large-sorter.map-tasks</code> specifies the number of map
* tasks to run.
* <code>mapreduce.large-sorter.reduce-tasks</code> specifies the number of
* reduce tasks to run.
*/
public class LargeSorter extends Configured implements Tool {
private static final String LS_PREFIX = "mapreduce.large-sorter.";
public static final String MBS_PER_MAP = LS_PREFIX + "mbs-per-map";
public static final String NUM_MAP_TASKS = LS_PREFIX + "map-tasks";
public static final String NUM_REDUCE_TASKS = LS_PREFIX + "reduce-tasks";
private static final String MAX_VALUE = LS_PREFIX + "max-value";
private static final String MIN_VALUE = LS_PREFIX + "min-value";
private static final String MIN_KEY = LS_PREFIX + "min-key";
private static final String MAX_KEY = LS_PREFIX + "max-key";
/**
* User counters
*/
static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
/**
* A custom input format that creates virtual inputs of a single string
* for each map.
*/
static class RandomInputFormat extends InputFormat<Text, Text> {
/**
* Generate the requested number of file splits, with the filename
* set to the filename of the output file.
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> result = new ArrayList<InputSplit>();
Path outDir = FileOutputFormat.getOutputPath(job);
int numSplits =
job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
for(int i=0; i < numSplits; ++i) {
result.add(new FileSplit(
new Path(outDir, "dummy-split-" + i), 0, 1, null));
}
return result;
}
/**
* Return a single record (filename, "") where the filename is taken from
* the file split.
*/
static class RandomRecordReader extends RecordReader<Text, Text> {
Path name;
Text key = null;
Text value = new Text();
public RandomRecordReader(Path p) {
name = p;
}
public void initialize(InputSplit split,
TaskAttemptContext context)
throws IOException, InterruptedException {
}
public boolean nextKeyValue() {
if (name != null) {
key = new Text();
key.set(name.getName());
name = null;
return true;
}
return false;
}
public Text getCurrentKey() {
return key;
}
public Text getCurrentValue() {
return value;
}
public void close() {}
public float getProgress() {
return 0.0f;
}
}
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return new RandomRecordReader(((FileSplit) split).getPath());
}
}
static class RandomMapper extends Mapper<WritableComparable, Writable,
BytesWritable, BytesWritable> {
private long numBytesToWrite;
private int minKeySize;
private int keySizeRange;
private int minValueSize;
private int valueSizeRange;
private Random random = new Random();
private BytesWritable randomKey = new BytesWritable();
private BytesWritable randomValue = new BytesWritable();
private void randomizeBytes(byte[] data, int offset, int length) {
for(int i=offset + length - 1; i >= offset; --i) {
data[i] = (byte) random.nextInt(256);
}
}
@Override
public void setup(Context context) {
Configuration conf = context.getConfiguration();
numBytesToWrite = 1024 * 1024 * conf.getLong(MBS_PER_MAP,
2 * conf.getInt(MRJobConfig.IO_SORT_MB, 512));
minKeySize = conf.getInt(MIN_KEY, 10);
keySizeRange =
conf.getInt(MAX_KEY, 1000) - minKeySize;
minValueSize = conf.getInt(MIN_VALUE, 0);
valueSizeRange =
conf.getInt(MAX_VALUE, 20000) - minValueSize;
}
/**
* Given an output filename, write a bunch of random records to it.
*/
public void map(WritableComparable key,
Writable value,
Context context) throws IOException,InterruptedException {
int itemCount = 0;
while (numBytesToWrite > 0) {
int keyLength = minKeySize +
(keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
randomKey.setSize(keyLength);
randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
int valueLength = minValueSize +
(valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
randomValue.setSize(valueLength);
randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
context.write(randomKey, randomValue);
numBytesToWrite -= keyLength + valueLength;
context.getCounter(Counters.BYTES_WRITTEN).increment(keyLength + valueLength);
context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
if (++itemCount % 200 == 0) {
context.setStatus("wrote record " + itemCount + ". " +
numBytesToWrite + " bytes left.");
}
}
context.setStatus("done with " + itemCount + " records.");
}
}
static class Discarder extends Reducer<BytesWritable, BytesWritable,
WritableComparable, Writable> {
@Override
public void reduce(BytesWritable key, Iterable<BytesWritable> values,
Context context) throws IOException, InterruptedException {
// Do nothing
}
}
private void verifyNotZero(Configuration conf, String config) {
if (conf.getInt(config, 1) <= 0) {
throw new IllegalArgumentException(config + "should be > 0");
}
}
public int run(String[] args) throws Exception {
Path outDir = new Path(
LargeSorter.class.getName() + System.currentTimeMillis());
Configuration conf = getConf();
verifyNotZero(conf, MBS_PER_MAP);
verifyNotZero(conf, NUM_MAP_TASKS);
conf.setInt(MRJobConfig.NUM_MAPS, conf.getInt(NUM_MAP_TASKS, 2));
int ioSortMb = conf.getInt(MRJobConfig.IO_SORT_MB, 512);
int mapMb = Math.max(2 * ioSortMb, conf.getInt(MRJobConfig.MAP_MEMORY_MB,
MRJobConfig.DEFAULT_MAP_MEMORY_MB));
conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMb);
conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx" + (mapMb - 200) + "m");
@SuppressWarnings("deprecation")
Job job = new Job(conf);
job.setJarByClass(LargeSorter.class);
job.setJobName("large-sorter");
FileOutputFormat.setOutputPath(job, outDir);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(BytesWritable.class);
job.setInputFormatClass(RandomInputFormat.class);
job.setMapperClass(RandomMapper.class);
job.setReducerClass(Discarder.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setNumReduceTasks(conf.getInt(NUM_REDUCE_TASKS, 1));
Date startTime = new Date();
System.out.println("Job started: " + startTime);
int ret = 1;
try {
ret = job.waitForCompletion(true) ? 0 : 1;
} finally {
FileSystem.get(conf).delete(outDir, true);
}
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) /1000 +
" seconds.");
return ret;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new LargeSorter(), args);
System.exit(res);
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.MiniMRClientCluster;
import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
public class TestLargeSort {
MiniMRClientCluster cluster;
@Before
public void setup() throws IOException {
Configuration conf = new YarnConfiguration();
cluster = MiniMRClientClusterFactory.create(this.getClass(), 2, conf);
cluster.start();
}
@After
public void cleanup() throws IOException {
if (cluster != null) {
cluster.stop();
cluster = null;
}
}
@Test
public void testLargeSort() throws Exception {
String[] args = new String[0];
int[] ioSortMbs = {128, 256, 1536};
for (int ioSortMb : ioSortMbs) {
Configuration conf = new Configuration(cluster.getConfig());
conf.setInt(MRJobConfig.IO_SORT_MB, ioSortMb);
conf.setInt(LargeSorter.NUM_MAP_TASKS, 1);
conf.setInt(LargeSorter.MBS_PER_MAP, ioSortMb);
assertEquals("Large sort failed for " + ioSortMb, 0,
ToolRunner.run(conf, new LargeSorter(), args));
}
}
}

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.mapred.TestTextInputFormat;
import org.apache.hadoop.mapred.ThreadedMapBenchmark;
import org.apache.hadoop.mapreduce.FailJob;
import org.apache.hadoop.mapreduce.LargeSorter;
import org.apache.hadoop.mapreduce.MiniHadoopClusterManager;
import org.apache.hadoop.mapreduce.SleepJob;
import org.apache.hadoop.util.ProgramDriver;
@ -104,6 +105,8 @@ public MapredTestDriver(ProgramDriver pgd) {
"HDFS Stress Test and Live Data Verification.");
pgd.addClass("minicluster", MiniHadoopClusterManager.class,
"Single process HDFS and MR cluster.");
pgd.addClass("largesorter", LargeSorter.class,
"Large-Sort tester");
} catch(Throwable e) {
e.printStackTrace();
}