MAPREDUCE-5186. mapreduce.job.max.split.locations causes some splits created by CombineFileInputFormat to fail. Contributed by Robert Parker and Jason Lowe

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1540813 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-11-11 19:22:38 +00:00
parent 9673baa7e8
commit 38a3b925e9
5 changed files with 107 additions and 178 deletions

View File

@ -197,6 +197,10 @@ Release 2.3.0 - UNRELEASED
MAPREDUCE-5585. TestCopyCommitter#testNoCommitAction Fails on JDK7
(jeagles)
MAPREDUCE-5186. mapreduce.job.max.split.locations causes some splits
created by CombineFileInputFormat to fail (Robert Parker and Jason Lowe
via jlowe)
Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.split;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@ -39,6 +40,9 @@ import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* The class that is used by the Job clients to write splits (both the meta
* and the raw bytes parts)
@ -47,6 +51,7 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Unstable
public class JobSplitWriter {
private static final Log LOG = LogFactory.getLog(JobSplitWriter.class);
private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
private static final byte[] SPLIT_FILE_HEADER;
@ -129,9 +134,10 @@ public class JobSplitWriter {
long currCount = out.getPos();
String[] locations = split.getLocations();
if (locations.length > maxBlockLocations) {
throw new IOException("Max block location exceeded for split: "
LOG.warn("Max block location exceeded for split: "
+ split + " splitsize: " + locations.length +
" maxsize: " + maxBlockLocations);
locations = Arrays.copyOf(locations, maxBlockLocations);
}
info[i++] =
new JobSplit.SplitMetaInfo(
@ -159,9 +165,10 @@ public class JobSplitWriter {
long currLen = out.getPos();
String[] locations = split.getLocations();
if (locations.length > maxBlockLocations) {
throw new IOException("Max block location exceeded for split: "
LOG.warn("Max block location exceeded for split: "
+ split + " splitsize: " + locations.length +
" maxsize: " + maxBlockLocations);
locations = Arrays.copyOf(locations,maxBlockLocations);
}
info[i++] = new JobSplit.SplitMetaInfo(
locations, offset,

View File

@ -82,6 +82,14 @@
</description>
</property>
<property>
<name>mapreduce.job.max.split.locations</name>
<value>10</value>
<description>The max number of block locations to store for each split for
locality calculation.
</description>
</property>
<property>
<name>mapreduce.job.split.metainfo.maxsize</name>
<value>10000000</value>

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.split;
import static org.junit.Assert.assertEquals;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.junit.Test;
public class TestJobSplitWriter {
private static final File TEST_DIR = new File(
System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")), "TestJobSplitWriter");
@Test
public void testMaxBlockLocationsNewSplits() throws Exception {
TEST_DIR.mkdirs();
try {
Configuration conf = new Configuration();
conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4);
Path submitDir = new Path(TEST_DIR.getAbsolutePath());
FileSystem fs = FileSystem.getLocal(conf);
FileSplit split = new FileSplit(new Path("/some/path"), 0, 1,
new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" });
JobSplitWriter.createSplitFiles(submitDir, conf, fs,
new FileSplit[] { split });
JobSplit.TaskSplitMetaInfo[] infos =
SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
submitDir);
assertEquals("unexpected number of splits", 1, infos.length);
assertEquals("unexpected number of split locations",
4, infos[0].getLocations().length);
} finally {
FileUtil.fullyDelete(TEST_DIR);
}
}
@Test
public void testMaxBlockLocationsOldSplits() throws Exception {
TEST_DIR.mkdirs();
try {
Configuration conf = new Configuration();
conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4);
Path submitDir = new Path(TEST_DIR.getAbsolutePath());
FileSystem fs = FileSystem.getLocal(conf);
org.apache.hadoop.mapred.FileSplit split =
new org.apache.hadoop.mapred.FileSplit(new Path("/some/path"), 0, 1,
new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" });
JobSplitWriter.createSplitFiles(submitDir, conf, fs,
new org.apache.hadoop.mapred.InputSplit[] { split });
JobSplit.TaskSplitMetaInfo[] infos =
SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
submitDir);
assertEquals("unexpected number of splits", 1, infos.length);
assertEquals("unexpected number of split locations",
4, infos[0].getLocations().length);
} finally {
FileUtil.fullyDelete(TEST_DIR);
}
}
}

View File

@ -1,176 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
/**
* A JUnit test to test limits on block locations
*/
public class TestBlockLimits extends TestCase {
private static String TEST_ROOT_DIR = new File(System.getProperty(
"test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
public void testWithLimits() throws IOException, InterruptedException,
ClassNotFoundException {
MiniMRClientCluster mr = null;
try {
mr = MiniMRClientClusterFactory.create(this.getClass(), 2,
new Configuration());
runCustomFormat(mr);
} finally {
if (mr != null) {
mr.stop();
}
}
}
private void runCustomFormat(MiniMRClientCluster mr) throws IOException {
JobConf job = new JobConf(mr.getConfig());
FileSystem fileSys = FileSystem.get(job);
Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local");
Path outDir = new Path(testDir, "out");
System.out.println("testDir= " + testDir);
fileSys.delete(testDir, true);
job.setInputFormat(MyInputFormat.class);
job.setOutputFormat(MyOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setNumMapTasks(100);
job.setNumReduceTasks(1);
job.set("non.std.out", outDir.toString());
try {
JobClient.runJob(job);
assertTrue(false);
} catch (IOException ie) {
System.out.println("Failed job " + StringUtils.stringifyException(ie));
} finally {
fileSys.delete(testDir, true);
}
}
static class MyMapper extends MapReduceBase implements
Mapper<WritableComparable, Writable, WritableComparable, Writable> {
public void map(WritableComparable key, Writable value,
OutputCollector<WritableComparable, Writable> out, Reporter reporter)
throws IOException {
}
}
static class MyReducer extends MapReduceBase implements
Reducer<WritableComparable, Writable, WritableComparable, Writable> {
public void reduce(WritableComparable key, Iterator<Writable> values,
OutputCollector<WritableComparable, Writable> output, Reporter reporter)
throws IOException {
}
}
private static class MyInputFormat implements InputFormat<IntWritable, Text> {
private static class MySplit implements InputSplit {
int first;
int length;
public MySplit() {
}
public MySplit(int first, int length) {
this.first = first;
this.length = length;
}
public String[] getLocations() {
return new String[200];
}
public long getLength() {
return length;
}
public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, first);
WritableUtils.writeVInt(out, length);
}
public void readFields(DataInput in) throws IOException {
first = WritableUtils.readVInt(in);
length = WritableUtils.readVInt(in);
}
}
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
return new MySplit[] { new MySplit(0, 1), new MySplit(1, 3),
new MySplit(4, 2) };
}
public RecordReader<IntWritable, Text> getRecordReader(InputSplit split,
JobConf job, Reporter reporter) throws IOException {
return null;
}
}
static class MyOutputFormat implements OutputFormat {
static class MyRecordWriter implements RecordWriter<Object, Object> {
public MyRecordWriter(Path outputFile, JobConf job) throws IOException {
}
public void write(Object key, Object value) throws IOException {
return;
}
public void close(Reporter reporter) throws IOException {
}
}
public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress) throws IOException {
return new MyRecordWriter(new Path(job.get("non.std.out")), job);
}
public void checkOutputSpecs(FileSystem ignored, JobConf job)
throws IOException {
}
}
}