svn merge -c 1540813 FIXES: MAPREDUCE-5186. mapreduce.job.max.split.locations causes some splits created by CombineFileInputFormat to fail. Contributed by Robert Parker and Jason Lowe
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1540819 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b458fc7d9c
commit
97dc10bee5
|
@ -60,6 +60,10 @@ Release 2.3.0 - UNRELEASED
|
||||||
MAPREDUCE-5585. TestCopyCommitter#testNoCommitAction Fails on JDK7
|
MAPREDUCE-5585. TestCopyCommitter#testNoCommitAction Fails on JDK7
|
||||||
(jeagles)
|
(jeagles)
|
||||||
|
|
||||||
|
MAPREDUCE-5186. mapreduce.job.max.split.locations causes some splits
|
||||||
|
created by CombineFileInputFormat to fail (Robert Parker and Jason Lowe
|
||||||
|
via jlowe)
|
||||||
|
|
||||||
Release 2.2.1 - UNRELEASED
|
Release 2.2.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.split;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -39,6 +40,9 @@ import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The class that is used by the Job clients to write splits (both the meta
|
* The class that is used by the Job clients to write splits (both the meta
|
||||||
* and the raw bytes parts)
|
* and the raw bytes parts)
|
||||||
|
@ -47,6 +51,7 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public class JobSplitWriter {
|
public class JobSplitWriter {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(JobSplitWriter.class);
|
||||||
private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
|
private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
|
||||||
private static final byte[] SPLIT_FILE_HEADER;
|
private static final byte[] SPLIT_FILE_HEADER;
|
||||||
|
|
||||||
|
@ -129,9 +134,10 @@ public class JobSplitWriter {
|
||||||
long currCount = out.getPos();
|
long currCount = out.getPos();
|
||||||
String[] locations = split.getLocations();
|
String[] locations = split.getLocations();
|
||||||
if (locations.length > maxBlockLocations) {
|
if (locations.length > maxBlockLocations) {
|
||||||
throw new IOException("Max block location exceeded for split: "
|
LOG.warn("Max block location exceeded for split: "
|
||||||
+ split + " splitsize: " + locations.length +
|
+ split + " splitsize: " + locations.length +
|
||||||
" maxsize: " + maxBlockLocations);
|
" maxsize: " + maxBlockLocations);
|
||||||
|
locations = Arrays.copyOf(locations, maxBlockLocations);
|
||||||
}
|
}
|
||||||
info[i++] =
|
info[i++] =
|
||||||
new JobSplit.SplitMetaInfo(
|
new JobSplit.SplitMetaInfo(
|
||||||
|
@ -159,9 +165,10 @@ public class JobSplitWriter {
|
||||||
long currLen = out.getPos();
|
long currLen = out.getPos();
|
||||||
String[] locations = split.getLocations();
|
String[] locations = split.getLocations();
|
||||||
if (locations.length > maxBlockLocations) {
|
if (locations.length > maxBlockLocations) {
|
||||||
throw new IOException("Max block location exceeded for split: "
|
LOG.warn("Max block location exceeded for split: "
|
||||||
+ split + " splitsize: " + locations.length +
|
+ split + " splitsize: " + locations.length +
|
||||||
" maxsize: " + maxBlockLocations);
|
" maxsize: " + maxBlockLocations);
|
||||||
|
locations = Arrays.copyOf(locations,maxBlockLocations);
|
||||||
}
|
}
|
||||||
info[i++] = new JobSplit.SplitMetaInfo(
|
info[i++] = new JobSplit.SplitMetaInfo(
|
||||||
locations, offset,
|
locations, offset,
|
||||||
|
|
|
@ -285,6 +285,14 @@
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.job.max.split.locations</name>
|
||||||
|
<value>10</value>
|
||||||
|
<description>The max number of block locations to store for each split for
|
||||||
|
locality calculation.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>mapreduce.job.split.metainfo.maxsize</name>
|
<name>mapreduce.job.split.metainfo.maxsize</name>
|
||||||
<value>10000000</value>
|
<value>10000000</value>
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapreduce.split;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.mapreduce.JobID;
|
||||||
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestJobSplitWriter {
|
||||||
|
|
||||||
|
private static final File TEST_DIR = new File(
|
||||||
|
System.getProperty("test.build.data",
|
||||||
|
System.getProperty("java.io.tmpdir")), "TestJobSplitWriter");
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMaxBlockLocationsNewSplits() throws Exception {
|
||||||
|
TEST_DIR.mkdirs();
|
||||||
|
try {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4);
|
||||||
|
Path submitDir = new Path(TEST_DIR.getAbsolutePath());
|
||||||
|
FileSystem fs = FileSystem.getLocal(conf);
|
||||||
|
FileSplit split = new FileSplit(new Path("/some/path"), 0, 1,
|
||||||
|
new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" });
|
||||||
|
JobSplitWriter.createSplitFiles(submitDir, conf, fs,
|
||||||
|
new FileSplit[] { split });
|
||||||
|
JobSplit.TaskSplitMetaInfo[] infos =
|
||||||
|
SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
|
||||||
|
submitDir);
|
||||||
|
assertEquals("unexpected number of splits", 1, infos.length);
|
||||||
|
assertEquals("unexpected number of split locations",
|
||||||
|
4, infos[0].getLocations().length);
|
||||||
|
} finally {
|
||||||
|
FileUtil.fullyDelete(TEST_DIR);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMaxBlockLocationsOldSplits() throws Exception {
|
||||||
|
TEST_DIR.mkdirs();
|
||||||
|
try {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, 4);
|
||||||
|
Path submitDir = new Path(TEST_DIR.getAbsolutePath());
|
||||||
|
FileSystem fs = FileSystem.getLocal(conf);
|
||||||
|
org.apache.hadoop.mapred.FileSplit split =
|
||||||
|
new org.apache.hadoop.mapred.FileSplit(new Path("/some/path"), 0, 1,
|
||||||
|
new String[] { "loc1", "loc2", "loc3", "loc4", "loc5" });
|
||||||
|
JobSplitWriter.createSplitFiles(submitDir, conf, fs,
|
||||||
|
new org.apache.hadoop.mapred.InputSplit[] { split });
|
||||||
|
JobSplit.TaskSplitMetaInfo[] infos =
|
||||||
|
SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
|
||||||
|
submitDir);
|
||||||
|
assertEquals("unexpected number of splits", 1, infos.length);
|
||||||
|
assertEquals("unexpected number of split locations",
|
||||||
|
4, infos[0].getLocations().length);
|
||||||
|
} finally {
|
||||||
|
FileUtil.fullyDelete(TEST_DIR);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,176 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.mapred;
|
|
||||||
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.DataOutput;
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Iterator;
|
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.io.IntWritable;
|
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.hadoop.io.Writable;
|
|
||||||
import org.apache.hadoop.io.WritableComparable;
|
|
||||||
import org.apache.hadoop.io.WritableUtils;
|
|
||||||
import org.apache.hadoop.util.Progressable;
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A JUnit test to test limits on block locations
|
|
||||||
*/
|
|
||||||
public class TestBlockLimits extends TestCase {
|
|
||||||
private static String TEST_ROOT_DIR = new File(System.getProperty(
|
|
||||||
"test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
|
|
||||||
|
|
||||||
public void testWithLimits() throws IOException, InterruptedException,
|
|
||||||
ClassNotFoundException {
|
|
||||||
MiniMRClientCluster mr = null;
|
|
||||||
try {
|
|
||||||
mr = MiniMRClientClusterFactory.create(this.getClass(), 2,
|
|
||||||
new Configuration());
|
|
||||||
runCustomFormat(mr);
|
|
||||||
} finally {
|
|
||||||
if (mr != null) {
|
|
||||||
mr.stop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void runCustomFormat(MiniMRClientCluster mr) throws IOException {
|
|
||||||
JobConf job = new JobConf(mr.getConfig());
|
|
||||||
FileSystem fileSys = FileSystem.get(job);
|
|
||||||
Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local");
|
|
||||||
Path outDir = new Path(testDir, "out");
|
|
||||||
System.out.println("testDir= " + testDir);
|
|
||||||
fileSys.delete(testDir, true);
|
|
||||||
job.setInputFormat(MyInputFormat.class);
|
|
||||||
job.setOutputFormat(MyOutputFormat.class);
|
|
||||||
job.setOutputKeyClass(Text.class);
|
|
||||||
job.setOutputValueClass(IntWritable.class);
|
|
||||||
|
|
||||||
job.setMapperClass(MyMapper.class);
|
|
||||||
job.setReducerClass(MyReducer.class);
|
|
||||||
job.setNumMapTasks(100);
|
|
||||||
job.setNumReduceTasks(1);
|
|
||||||
job.set("non.std.out", outDir.toString());
|
|
||||||
try {
|
|
||||||
JobClient.runJob(job);
|
|
||||||
assertTrue(false);
|
|
||||||
} catch (IOException ie) {
|
|
||||||
System.out.println("Failed job " + StringUtils.stringifyException(ie));
|
|
||||||
} finally {
|
|
||||||
fileSys.delete(testDir, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
static class MyMapper extends MapReduceBase implements
|
|
||||||
Mapper<WritableComparable, Writable, WritableComparable, Writable> {
|
|
||||||
|
|
||||||
public void map(WritableComparable key, Writable value,
|
|
||||||
OutputCollector<WritableComparable, Writable> out, Reporter reporter)
|
|
||||||
throws IOException {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static class MyReducer extends MapReduceBase implements
|
|
||||||
Reducer<WritableComparable, Writable, WritableComparable, Writable> {
|
|
||||||
public void reduce(WritableComparable key, Iterator<Writable> values,
|
|
||||||
OutputCollector<WritableComparable, Writable> output, Reporter reporter)
|
|
||||||
throws IOException {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class MyInputFormat implements InputFormat<IntWritable, Text> {
|
|
||||||
|
|
||||||
private static class MySplit implements InputSplit {
|
|
||||||
int first;
|
|
||||||
int length;
|
|
||||||
|
|
||||||
public MySplit() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public MySplit(int first, int length) {
|
|
||||||
this.first = first;
|
|
||||||
this.length = length;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String[] getLocations() {
|
|
||||||
return new String[200];
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getLength() {
|
|
||||||
return length;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void write(DataOutput out) throws IOException {
|
|
||||||
WritableUtils.writeVInt(out, first);
|
|
||||||
WritableUtils.writeVInt(out, length);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void readFields(DataInput in) throws IOException {
|
|
||||||
first = WritableUtils.readVInt(in);
|
|
||||||
length = WritableUtils.readVInt(in);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public InputSplit[] getSplits(JobConf job, int numSplits)
|
|
||||||
throws IOException {
|
|
||||||
return new MySplit[] { new MySplit(0, 1), new MySplit(1, 3),
|
|
||||||
new MySplit(4, 2) };
|
|
||||||
}
|
|
||||||
|
|
||||||
public RecordReader<IntWritable, Text> getRecordReader(InputSplit split,
|
|
||||||
JobConf job, Reporter reporter) throws IOException {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
static class MyOutputFormat implements OutputFormat {
|
|
||||||
static class MyRecordWriter implements RecordWriter<Object, Object> {
|
|
||||||
|
|
||||||
public MyRecordWriter(Path outputFile, JobConf job) throws IOException {
|
|
||||||
}
|
|
||||||
|
|
||||||
public void write(Object key, Object value) throws IOException {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void close(Reporter reporter) throws IOException {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
|
|
||||||
String name, Progressable progress) throws IOException {
|
|
||||||
return new MyRecordWriter(new Path(job.get("non.std.out")), job);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void checkOutputSpecs(FileSystem ignored, JobConf job)
|
|
||||||
throws IOException {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
Loading…
Reference in New Issue