MAPREDUCE-7132. JobSplitWriter prints unnecessary warnings if EC(RS10,4) is used. Contributed by Peter Bacsko.

This commit is contained in:
Xiao Chen 2018-10-16 10:22:47 -07:00
parent 753f149fd3
commit 25f8fcb064
3 changed files with 130 additions and 2 deletions

View File

@ -78,7 +78,7 @@ public interface MRConfig {
"mapreduce.task.max.status.length"; "mapreduce.task.max.status.length";
public static final int PROGRESS_STATUS_LEN_LIMIT_DEFAULT = 512; public static final int PROGRESS_STATUS_LEN_LIMIT_DEFAULT = 512;
public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 10; public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 15;
public static final String MAX_BLOCK_LOCATIONS_KEY = public static final String MAX_BLOCK_LOCATIONS_KEY =
"mapreduce.job.max.split.locations"; "mapreduce.job.max.split.locations";

View File

@ -149,7 +149,7 @@
<property> <property>
<name>mapreduce.job.max.split.locations</name> <name>mapreduce.job.max.split.locations</name>
<value>10</value> <value>15</value>
<description>The max number of block locations to store for each split for <description>The max number of block locations to store for each split for
locality calculation. locality calculation.
</description> </description>

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.split;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Tests that maxBlockLocations default value is sufficient for RS-10-4.
*/
public class TestJobSplitWriterWithEC {
// This will ensure 14 block locations
private ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies
.getByID(SystemErasureCodingPolicies.RS_10_4_POLICY_ID);
private static final int BLOCKSIZE = 1024 * 1024 * 10;
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private Configuration conf;
private Path submitDir;
private Path testFile;
@Before
public void setup() throws Exception {
Configuration hdfsConf = new HdfsConfiguration();
hdfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
String namenodeDir = new File(MiniDFSCluster.getBaseDirectory(), "name").
getAbsolutePath();
hdfsConf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, namenodeDir);
hdfsConf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, namenodeDir);
hdfsConf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
cluster = new MiniDFSCluster.Builder(hdfsConf).numDataNodes(15).build();
fs = cluster.getFileSystem();
fs.enableErasureCodingPolicy(ecPolicy.getName());
fs.setErasureCodingPolicy(new Path("/"), ecPolicy.getName());
cluster.waitActive();
conf = new Configuration();
submitDir = new Path("/");
testFile = new Path("/testfile");
DFSTestUtil.writeFile(fs, testFile,
StripedFileTestUtil.generateBytes(BLOCKSIZE));
conf.set(FileInputFormat.INPUT_DIR,
fs.getUri().toString() + testFile.toString());
}
@After
public void after() {
cluster.close();
}
@Test
public void testMaxBlockLocationsNewSplitsWithErasureCoding()
throws Exception {
Job job = Job.getInstance(conf);
final FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
final List<InputSplit> splits = fileInputFormat.getSplits(job);
JobSplitWriter.createSplitFiles(submitDir, conf, fs, splits);
validateSplitMetaInfo();
}
@Test
public void testMaxBlockLocationsOldSplitsWithErasureCoding()
throws Exception {
JobConf jobConf = new JobConf(conf);
org.apache.hadoop.mapred.TextInputFormat fileInputFormat
= new org.apache.hadoop.mapred.TextInputFormat();
fileInputFormat.configure(jobConf);
final org.apache.hadoop.mapred.InputSplit[] splits =
fileInputFormat.getSplits(jobConf, 1);
JobSplitWriter.createSplitFiles(submitDir, conf, fs, splits);
validateSplitMetaInfo();
}
private void validateSplitMetaInfo() throws IOException {
JobSplit.TaskSplitMetaInfo[] splitInfo =
SplitMetaInfoReader.readSplitMetaInfo(new JobID(), fs, conf,
submitDir);
assertEquals("Number of splits", 1, splitInfo.length);
assertEquals("Number of block locations", 14,
splitInfo[0].getLocations().length);
}
}