diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index c1a2216c225..b30c0852e51 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -60,6 +60,10 @@ Release 2.3.0 - UNRELEASED
MAPREDUCE-5585. TestCopyCommitter#testNoCommitAction Fails on JDK7
(jeagles)
+ MAPREDUCE-5186. mapreduce.job.max.split.locations causes some splits
+ created by CombineFileInputFormat to fail (Robert Parker and Jason Lowe
+ via jlowe)
+
Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
index e6ecac5b012..eb10ae50694 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.split;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -39,6 +40,9 @@ import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
/**
* The class that is used by the Job clients to write splits (both the meta
* and the raw bytes parts)
@@ -47,6 +51,7 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Unstable
public class JobSplitWriter {
+ private static final Log LOG = LogFactory.getLog(JobSplitWriter.class);
private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
private static final byte[] SPLIT_FILE_HEADER;
@@ -129,9 +134,10 @@ public class JobSplitWriter {
long currCount = out.getPos();
String[] locations = split.getLocations();
if (locations.length > maxBlockLocations) {
- throw new IOException("Max block location exceeded for split: "
+ LOG.warn("Max block location exceeded for split: "
+ split + " splitsize: " + locations.length +
" maxsize: " + maxBlockLocations);
+ locations = Arrays.copyOf(locations, maxBlockLocations);
}
info[i++] =
new JobSplit.SplitMetaInfo(
@@ -159,9 +165,10 @@ public class JobSplitWriter {
long currLen = out.getPos();
String[] locations = split.getLocations();
if (locations.length > maxBlockLocations) {
- throw new IOException("Max block location exceeded for split: "
+ LOG.warn("Max block location exceeded for split: "
+ split + " splitsize: " + locations.length +
" maxsize: " + maxBlockLocations);
+ locations = Arrays.copyOf(locations,maxBlockLocations);
}
info[i++] = new JobSplit.SplitMetaInfo(
locations, offset,
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 75ee655091e..11f3d43e9da 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -285,6 +285,14 @@
+
+ mapreduce.job.max.split.locations
+ 10
+ The max number of block locations to store for each split for
+ locality calculation.
+
+
+
mapreduce.job.split.metainfo.maxsize
10000000
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/split/TestJobSplitWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/split/TestJobSplitWriter.java
new file mode 100644
index 00000000000..fb1a255cf56
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/split/TestJobSplitWriter.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.split;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.junit.Test;
+
+public class TestJobSplitWriter {
+
+ private static final File TEST_DIR = new File(
+ System.getProperty("test.build.data",
+ System.getProperty("java.io.tmpdir")), "TestJobSplitWriter");
+
+ @Test
+ public void testMaxBlockLocationsNewSplits() throws Exception {
+ TEST_DIR.mkdirs();
+ try {
+ Configuration conf = new Configuration();
+ conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4);
+ Path submitDir = new Path(TEST_DIR.getAbsolutePath());
+ FileSystem fs = FileSystem.getLocal(conf);
+ FileSplit split = new FileSplit(new Path("/some/path"), 0, 1,
+ new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" });
+ JobSplitWriter.createSplitFiles(submitDir, conf, fs,
+ new FileSplit[] { split });
+ JobSplit.TaskSplitMetaInfo[] infos =
+ SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
+ submitDir);
+ assertEquals("unexpected number of splits", 1, infos.length);
+ assertEquals("unexpected number of split locations",
+ 4, infos[0].getLocations().length);
+ } finally {
+ FileUtil.fullyDelete(TEST_DIR);
+ }
+ }
+
+ @Test
+ public void testMaxBlockLocationsOldSplits() throws Exception {
+ TEST_DIR.mkdirs();
+ try {
+ Configuration conf = new Configuration();
+ conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4);
+ Path submitDir = new Path(TEST_DIR.getAbsolutePath());
+ FileSystem fs = FileSystem.getLocal(conf);
+ org.apache.hadoop.mapred.FileSplit split =
+ new org.apache.hadoop.mapred.FileSplit(new Path("/some/path"), 0, 1,
+ new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" });
+ JobSplitWriter.createSplitFiles(submitDir, conf, fs,
+ new org.apache.hadoop.mapred.InputSplit[] { split });
+ JobSplit.TaskSplitMetaInfo[] infos =
+ SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
+ submitDir);
+ assertEquals("unexpected number of splits", 1, infos.length);
+ assertEquals("unexpected number of split locations",
+ 4, infos[0].getLocations().length);
+ } finally {
+ FileUtil.fullyDelete(TEST_DIR);
+ }
+ }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java
deleted file mode 100644
index d8b250ad45c..00000000000
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.IOException;
-import java.util.Iterator;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * A JUnit test to test limits on block locations
- */
-public class TestBlockLimits extends TestCase {
- private static String TEST_ROOT_DIR = new File(System.getProperty(
- "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
-
- public void testWithLimits() throws IOException, InterruptedException,
- ClassNotFoundException {
- MiniMRClientCluster mr = null;
- try {
- mr = MiniMRClientClusterFactory.create(this.getClass(), 2,
- new Configuration());
- runCustomFormat(mr);
- } finally {
- if (mr != null) {
- mr.stop();
- }
- }
- }
-
- private void runCustomFormat(MiniMRClientCluster mr) throws IOException {
- JobConf job = new JobConf(mr.getConfig());
- FileSystem fileSys = FileSystem.get(job);
- Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local");
- Path outDir = new Path(testDir, "out");
- System.out.println("testDir= " + testDir);
- fileSys.delete(testDir, true);
- job.setInputFormat(MyInputFormat.class);
- job.setOutputFormat(MyOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- job.setMapperClass(MyMapper.class);
- job.setReducerClass(MyReducer.class);
- job.setNumMapTasks(100);
- job.setNumReduceTasks(1);
- job.set("non.std.out", outDir.toString());
- try {
- JobClient.runJob(job);
- assertTrue(false);
- } catch (IOException ie) {
- System.out.println("Failed job " + StringUtils.stringifyException(ie));
- } finally {
- fileSys.delete(testDir, true);
- }
-
- }
-
- static class MyMapper extends MapReduceBase implements
- Mapper {
-
- public void map(WritableComparable key, Writable value,
- OutputCollector out, Reporter reporter)
- throws IOException {
- }
- }
-
- static class MyReducer extends MapReduceBase implements
- Reducer {
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter)
- throws IOException {
- }
- }
-
- private static class MyInputFormat implements InputFormat {
-
- private static class MySplit implements InputSplit {
- int first;
- int length;
-
- public MySplit() {
- }
-
- public MySplit(int first, int length) {
- this.first = first;
- this.length = length;
- }
-
- public String[] getLocations() {
- return new String[200];
- }
-
- public long getLength() {
- return length;
- }
-
- public void write(DataOutput out) throws IOException {
- WritableUtils.writeVInt(out, first);
- WritableUtils.writeVInt(out, length);
- }
-
- public void readFields(DataInput in) throws IOException {
- first = WritableUtils.readVInt(in);
- length = WritableUtils.readVInt(in);
- }
- }
-
- public InputSplit[] getSplits(JobConf job, int numSplits)
- throws IOException {
- return new MySplit[] { new MySplit(0, 1), new MySplit(1, 3),
- new MySplit(4, 2) };
- }
-
- public RecordReader getRecordReader(InputSplit split,
- JobConf job, Reporter reporter) throws IOException {
- return null;
- }
-
- }
-
- static class MyOutputFormat implements OutputFormat {
- static class MyRecordWriter implements RecordWriter