From 97dc10bee566e892712bf6d747b086251de7fa3d Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Mon, 11 Nov 2013 19:30:25 +0000 Subject: [PATCH] svn merge -c 1540813 FIXES: MAPREDUCE-5186. mapreduce.job.max.split.locations causes some splits created by CombineFileInputFormat to fail. Contributed by Robert Parker and Jason Lowe git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1540819 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 4 + .../mapreduce/split/JobSplitWriter.java | 11 +- .../src/main/resources/mapred-default.xml | 8 + .../mapreduce/split/TestJobSplitWriter.java | 86 +++++++++ .../apache/hadoop/mapred/TestBlockLimits.java | 176 ------------------ 5 files changed, 107 insertions(+), 178 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/split/TestJobSplitWriter.java delete mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c1a2216c225..b30c0852e51 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -60,6 +60,10 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-5585. TestCopyCommitter#testNoCommitAction Fails on JDK7 (jeagles) + MAPREDUCE-5186. mapreduce.job.max.split.locations causes some splits + created by CombineFileInputFormat to fail (Robert Parker and Jason Lowe + via jlowe) + Release 2.2.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java index e6ecac5b012..eb10ae50694 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java @@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.split; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -39,6 +40,9 @@ import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + /** * The class that is used by the Job clients to write splits (both the meta * and the raw bytes parts) @@ -47,6 +51,7 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceStability.Unstable public class JobSplitWriter { + private static final Log LOG = LogFactory.getLog(JobSplitWriter.class); private static final int splitVersion = JobSplit.META_SPLIT_VERSION; private static final byte[] SPLIT_FILE_HEADER; @@ -129,9 +134,10 @@ public class JobSplitWriter { long currCount = out.getPos(); String[] locations = split.getLocations(); if (locations.length > maxBlockLocations) { - throw new IOException("Max block location exceeded for split: " + LOG.warn("Max block location exceeded for split: " + split + " splitsize: " + locations.length + " maxsize: " + maxBlockLocations); + locations = Arrays.copyOf(locations, maxBlockLocations); } info[i++] = new JobSplit.SplitMetaInfo( @@ -159,9 +165,10 @@ public class JobSplitWriter { long currLen = out.getPos(); String[] locations = split.getLocations(); if (locations.length > maxBlockLocations) { - throw new IOException("Max block location exceeded for split: " + LOG.warn("Max block location exceeded for split: " + split + " splitsize: " + locations.length + " maxsize: " + maxBlockLocations); + locations = Arrays.copyOf(locations,maxBlockLocations); } info[i++] = new JobSplit.SplitMetaInfo( locations, offset, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 75ee655091e..11f3d43e9da 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -285,6 +285,14 @@ + + mapreduce.job.max.split.locations + 10 + The max number of block locations to store for each split for + locality calculation. + + + mapreduce.job.split.metainfo.maxsize 10000000 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/split/TestJobSplitWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/split/TestJobSplitWriter.java new file mode 100644 index 00000000000..fb1a255cf56 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/split/TestJobSplitWriter.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.split; + +import static org.junit.Assert.assertEquals; + +import java.io.File; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.junit.Test; + +public class TestJobSplitWriter { + + private static final File TEST_DIR = new File( + System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")), "TestJobSplitWriter"); + + @Test + public void testMaxBlockLocationsNewSplits() throws Exception { + TEST_DIR.mkdirs(); + try { + Configuration conf = new Configuration(); + conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4); + Path submitDir = new Path(TEST_DIR.getAbsolutePath()); + FileSystem fs = FileSystem.getLocal(conf); + FileSplit split = new FileSplit(new Path("/some/path"), 0, 1, + new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" }); + JobSplitWriter.createSplitFiles(submitDir, conf, fs, + new FileSplit[] { split }); + JobSplit.TaskSplitMetaInfo[] infos = + SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf, + submitDir); + assertEquals("unexpected number of splits", 1, infos.length); + assertEquals("unexpected number of split locations", + 4, infos[0].getLocations().length); + } finally { + FileUtil.fullyDelete(TEST_DIR); + } + } + + @Test + public void testMaxBlockLocationsOldSplits() throws Exception { + TEST_DIR.mkdirs(); + try { + Configuration conf = new Configuration(); + conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4); + Path submitDir = new Path(TEST_DIR.getAbsolutePath()); + FileSystem fs = FileSystem.getLocal(conf); + org.apache.hadoop.mapred.FileSplit split = + new org.apache.hadoop.mapred.FileSplit(new Path("/some/path"), 0, 1, + new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" }); + JobSplitWriter.createSplitFiles(submitDir, conf, fs, + new org.apache.hadoop.mapred.InputSplit[] { split }); + JobSplit.TaskSplitMetaInfo[] infos = + SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf, + submitDir); + assertEquals("unexpected number of splits", 1, infos.length); + assertEquals("unexpected number of split locations", + 4, infos[0].getLocations().length); + } finally { + FileUtil.fullyDelete(TEST_DIR); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java deleted file mode 100644 index d8b250ad45c..00000000000 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java +++ /dev/null @@ -1,176 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.mapred; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.File; -import java.io.IOException; -import java.util.Iterator; - -import junit.framework.TestCase; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.StringUtils; - -/** - * A JUnit test to test limits on block locations - */ -public class TestBlockLimits extends TestCase { - private static String TEST_ROOT_DIR = new File(System.getProperty( - "test.build.data", "/tmp")).toURI().toString().replace(' ', '+'); - - public void testWithLimits() throws IOException, InterruptedException, - ClassNotFoundException { - MiniMRClientCluster mr = null; - try { - mr = MiniMRClientClusterFactory.create(this.getClass(), 2, - new Configuration()); - runCustomFormat(mr); - } finally { - if (mr != null) { - mr.stop(); - } - } - } - - private void runCustomFormat(MiniMRClientCluster mr) throws IOException { - JobConf job = new JobConf(mr.getConfig()); - FileSystem fileSys = FileSystem.get(job); - Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local"); - Path outDir = new Path(testDir, "out"); - System.out.println("testDir= " + testDir); - fileSys.delete(testDir, true); - job.setInputFormat(MyInputFormat.class); - job.setOutputFormat(MyOutputFormat.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setMapperClass(MyMapper.class); - job.setReducerClass(MyReducer.class); - job.setNumMapTasks(100); - job.setNumReduceTasks(1); - job.set("non.std.out", outDir.toString()); - try { - JobClient.runJob(job); - assertTrue(false); - } catch (IOException ie) { - System.out.println("Failed job " + StringUtils.stringifyException(ie)); - } finally { - fileSys.delete(testDir, true); - } - - } - - static class MyMapper extends MapReduceBase implements - Mapper { - - public void map(WritableComparable key, Writable value, - OutputCollector out, Reporter reporter) - throws IOException { - } - } - - static class MyReducer extends MapReduceBase implements - Reducer { - public void reduce(WritableComparable key, Iterator values, - OutputCollector output, Reporter reporter) - throws IOException { - } - } - - private static class MyInputFormat implements InputFormat { - - private static class MySplit implements InputSplit { - int first; - int length; - - public MySplit() { - } - - public MySplit(int first, int length) { - this.first = first; - this.length = length; - } - - public String[] getLocations() { - return new String[200]; - } - - public long getLength() { - return length; - } - - public void write(DataOutput out) throws IOException { - WritableUtils.writeVInt(out, first); - WritableUtils.writeVInt(out, length); - } - - public void readFields(DataInput in) throws IOException { - first = WritableUtils.readVInt(in); - length = WritableUtils.readVInt(in); - } - } - - public InputSplit[] getSplits(JobConf job, int numSplits) - throws IOException { - return new MySplit[] { new MySplit(0, 1), new MySplit(1, 3), - new MySplit(4, 2) }; - } - - public RecordReader getRecordReader(InputSplit split, - JobConf job, Reporter reporter) throws IOException { - return null; - } - - } - - static class MyOutputFormat implements OutputFormat { - static class MyRecordWriter implements RecordWriter { - - public MyRecordWriter(Path outputFile, JobConf job) throws IOException { - } - - public void write(Object key, Object value) throws IOException { - return; - } - - public void close(Reporter reporter) throws IOException { - } - } - - public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, - String name, Progressable progress) throws IOException { - return new MyRecordWriter(new Path(job.get("non.std.out")), job); - } - - public void checkOutputSpecs(FileSystem ignored, JobConf job) - throws IOException { - } - } - -} \ No newline at end of file