MAPREDUCE-6703. Add flag to allow MapReduce AM to request for OPPORTUNISTIC containers. Contributed by Arun Suresh

(cherry picked from commit ae353ea969)
This commit is contained in:
Jian He 2016-05-24 19:47:50 -07:00
parent 2ea17f4578
commit 396d222d7a
4 changed files with 416 additions and 69 deletions

View File

@ -102,6 +102,7 @@ public class RMContainerAllocator extends RMContainerRequestor
static final Priority PRIORITY_FAST_FAIL_MAP;
static final Priority PRIORITY_REDUCE;
static final Priority PRIORITY_MAP;
static final Priority PRIORITY_OPPORTUNISTIC_MAP;
@VisibleForTesting
public static final String RAMPDOWN_DIAGNOSTIC = "Reducer preempted "
@ -117,6 +118,10 @@ public class RMContainerAllocator extends RMContainerRequestor
PRIORITY_REDUCE.setPriority(10);
PRIORITY_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
PRIORITY_MAP.setPriority(20);
PRIORITY_OPPORTUNISTIC_MAP =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
Priority.class);
PRIORITY_OPPORTUNISTIC_MAP.setPriority(19);
}
/*
@ -220,6 +225,9 @@ protected void serviceInit(Configuration conf) throws Exception {
// Init startTime to current time. If all goes well, it will be reset after
// first attempt to contact RM.
retrystartTime = System.currentTimeMillis();
this.scheduledRequests.setNumOpportunisticMapsPer100(
conf.getInt(MRJobConfig.MR_NUM_OPPORTUNISTIC_MAPS_PER_100,
MRJobConfig.DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PER_100));
}
@Override
@ -832,6 +840,8 @@ private void applyConcurrentTaskLimits() {
setRequestLimit(PRIORITY_FAST_FAIL_MAP, mapResourceRequest,
failedMapRequestLimit);
setRequestLimit(PRIORITY_MAP, mapResourceRequest, normalMapRequestLimit);
setRequestLimit(PRIORITY_OPPORTUNISTIC_MAP, mapResourceRequest,
normalMapRequestLimit);
}
int numScheduledReduces = scheduledRequests.reduces.size();
@ -959,6 +969,12 @@ class ScheduledRequests {
@VisibleForTesting
final Map<TaskAttemptId, ContainerRequest> maps =
new LinkedHashMap<TaskAttemptId, ContainerRequest>();
int mapsMod100 = 0;
int numOpportunisticMapsPer100 = 0;
void setNumOpportunisticMapsPer100(int numMaps) {
this.numOpportunisticMapsPer100 = numMaps;
}
@VisibleForTesting
final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces =
@ -1000,7 +1016,19 @@ void addMap(ContainerRequestEvent event) {
new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP,
mapNodeLabelExpression);
LOG.info("Added "+event.getAttemptID()+" to list of failed maps");
// If its an earlier Failed attempt, do not retry as OPPORTUNISTIC
maps.put(event.getAttemptID(), request);
addContainerReq(request);
} else {
if (mapsMod100 < numOpportunisticMapsPer100) {
request =
new ContainerRequest(event, PRIORITY_OPPORTUNISTIC_MAP,
mapNodeLabelExpression);
maps.put(event.getAttemptID(), request);
addOpportunisticResourceRequest(request.priority, request.capability);
} else {
request =
new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression);
for (String host : event.getHosts()) {
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
if (list == null) {
@ -1012,7 +1040,7 @@ void addMap(ContainerRequestEvent event) {
LOG.debug("Added attempt req to host " + host);
}
}
for (String rack: event.getRacks()) {
for (String rack : event.getRacks()) {
LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
if (list == null) {
list = new LinkedList<TaskAttemptId>();
@ -1022,13 +1050,14 @@ void addMap(ContainerRequestEvent event) {
if (LOG.isDebugEnabled()) {
LOG.debug("Added attempt req to rack " + rack);
}
}
request =
new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression);
}
maps.put(event.getAttemptID(), request);
addContainerReq(request);
}
mapsMod100++;
mapsMod100 %= 100;
}
}
void addReduce(ContainerRequest req) {
@ -1057,7 +1086,8 @@ private void assign(List<Container> allocatedContainers) {
Priority priority = allocated.getPriority();
Resource allocatedResource = allocated.getResource();
if (PRIORITY_FAST_FAIL_MAP.equals(priority)
|| PRIORITY_MAP.equals(priority)) {
|| PRIORITY_MAP.equals(priority)
|| PRIORITY_OPPORTUNISTIC_MAP.equals(priority)) {
if (ResourceCalculatorUtils.computeAvailableContainers(allocatedResource,
mapResourceRequest, getSchedulerResourceTypes()) <= 0
|| maps.isEmpty()) {
@ -1215,7 +1245,8 @@ private ContainerRequest getContainerReqToReplace(Container allocated) {
LOG.info("Found replacement: " + toBeReplaced);
return toBeReplaced;
}
else if (PRIORITY_MAP.equals(priority)) {
else if (PRIORITY_MAP.equals(priority)
|| PRIORITY_OPPORTUNISTIC_MAP.equals(priority)) {
LOG.info("Replacing MAP container " + allocated.getId());
// allocated container was for a map
String host = allocated.getNodeId().getHost();
@ -1278,7 +1309,9 @@ private void assignMapsWithLocality(List<Container> allocatedContainers) {
while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
Container allocated = it.next();
Priority priority = allocated.getPriority();
assert PRIORITY_MAP.equals(priority);
assert (PRIORITY_MAP.equals(priority)
|| PRIORITY_OPPORTUNISTIC_MAP.equals(priority));
if (!PRIORITY_OPPORTUNISTIC_MAP.equals(priority)) {
// "if (maps.containsKey(tId))" below should be almost always true.
// hence this while loop would almost always have O(1) complexity
String host = allocated.getNodeId().getHost();
@ -1293,7 +1326,8 @@ private void assignMapsWithLocality(List<Container> allocatedContainers) {
containerAssigned(allocated, assigned);
it.remove();
JobCounterUpdateEvent jce =
new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
new JobCounterUpdateEvent(assigned.attemptID.getTaskId()
.getJobId());
jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
eventHandler.handle(jce);
hostLocalAssigned++;
@ -1304,13 +1338,16 @@ private void assignMapsWithLocality(List<Container> allocatedContainers) {
}
}
}
}
// try to match all rack local
it = allocatedContainers.iterator();
while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
Container allocated = it.next();
Priority priority = allocated.getPriority();
assert PRIORITY_MAP.equals(priority);
assert (PRIORITY_MAP.equals(priority)
|| PRIORITY_OPPORTUNISTIC_MAP.equals(priority));
if (!PRIORITY_OPPORTUNISTIC_MAP.equals(priority)) {
// "if (maps.containsKey(tId))" below should be almost always true.
// hence this while loop would almost always have O(1) complexity
String host = allocated.getNodeId().getHost();
@ -1323,7 +1360,8 @@ private void assignMapsWithLocality(List<Container> allocatedContainers) {
containerAssigned(allocated, assigned);
it.remove();
JobCounterUpdateEvent jce =
new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
new JobCounterUpdateEvent(assigned.attemptID.getTaskId()
.getJobId());
jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
eventHandler.handle(jce);
rackLocalAssigned++;
@ -1334,13 +1372,15 @@ private void assignMapsWithLocality(List<Container> allocatedContainers) {
}
}
}
}
// assign remaining
it = allocatedContainers.iterator();
while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
Container allocated = it.next();
Priority priority = allocated.getPriority();
assert PRIORITY_MAP.equals(priority);
assert (PRIORITY_MAP.equals(priority)
|| PRIORITY_OPPORTUNISTIC_MAP.equals(priority));
TaskAttemptId tId = maps.keySet().iterator().next();
ContainerRequest assigned = maps.remove(tId);
containerAssigned(allocated, assigned);

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
@ -424,8 +425,21 @@ protected void decContainerReq(ContainerRequest req) {
decResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
}
protected void addOpportunisticResourceRequest(Priority priority,
Resource capability) {
addResourceRequest(priority, ResourceRequest.ANY, capability, null,
ExecutionType.OPPORTUNISTIC);
}
private void addResourceRequest(Priority priority, String resourceName,
Resource capability, String nodeLabelExpression) {
addResourceRequest(priority, resourceName, capability, nodeLabelExpression,
ExecutionType.GUARANTEED);
}
private void addResourceRequest(Priority priority, String resourceName,
Resource capability, String nodeLabelExpression,
ExecutionType executionType) {
Map<String, Map<Resource, ResourceRequest>> remoteRequests =
this.remoteRequestsTable.get(priority);
if (remoteRequests == null) {
@ -448,6 +462,7 @@ private void addResourceRequest(Priority priority, String resourceName,
remoteRequest.setCapability(capability);
remoteRequest.setNumContainers(0);
remoteRequest.setNodeLabelExpression(nodeLabelExpression);
remoteRequest.setExecutionType(executionType);
reqMap.put(capability, remoteRequest);
}
remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);

View File

@ -949,4 +949,15 @@ public interface MRJobConfig {
public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB =
128;
/**
* Number of OPPORTUNISTIC Containers per 100 containers that will be
* requested by the MRAppMaster. The Default value is 0, which implies all
* maps will be guaranteed. A value of 100 means all maps will be requested
* as opportunistic. For any other value say 'x', the FIRST 'x' maps
* requested by the AM will be opportunistic. If the total number of maps
* for the job is less than 'x', then ALL maps will be OPPORTUNISTIC
*/
public static final String MR_NUM_OPPORTUNISTIC_MAPS_PER_100 =
"mapreduce.job.num-opportunistic-maps-per-100";
public static final int DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PER_100 = 0;
}

View File

@ -0,0 +1,281 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import static org.junit.Assert.*;
/**
* Simple MapReduce to test ability of the MRAppMaster to request and use
* OPPORTUNISTIC containers.
* This test runs a simple external merge sort using MapReduce.
* The Hadoop framework's merge on the reduce side will merge the partitions
* created to generate the final output which is sorted on the key.
*/
@SuppressWarnings(value={"unchecked", "deprecation"})
public class TestMROpportunisticMaps {
// Where MR job's input will reside.
private static final Path INPUT_DIR = new Path("/test/input");
// Where output goes.
private static final Path OUTPUT = new Path("/test/output");
/**
* Test will run with 4 Maps, All OPPORTUNISTIC.
* @throws Exception
*/
@Test
public void testAllOpportunisticMaps() throws Exception {
doTest(4, 1, 1, 4);
}
/**
* Test will run with 4 Maps, 2 OPPORTUNISTIC and 2 GUARANTEED.
* @throws Exception
*/
@Test
public void testHalfOpportunisticMaps() throws Exception {
doTest(4, 1, 1, 2);
}
/**
* Test will run with 6 Maps and 2 Reducers. All the Maps are OPPORTUNISTIC.
* @throws Exception
*/
@Test
public void testMultipleReducers() throws Exception {
doTest(6, 2, 1, 6);
}
public void doTest(int numMappers, int numReducers, int numNodes,
int percent) throws Exception {
doTest(numMappers, numReducers, numNodes, 1000, percent);
}
public void doTest(int numMappers, int numReducers, int numNodes,
int numLines, int percent) throws Exception {
MiniDFSCluster dfsCluster = null;
MiniMRClientCluster mrCluster = null;
FileSystem fileSystem = null;
try {
Configuration conf = new Configuration();
// Start the mini-MR and mini-DFS clusters
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true);
dfsCluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numNodes).build();
fileSystem = dfsCluster.getFileSystem();
mrCluster = MiniMRClientClusterFactory.create(this.getClass(),
numNodes, conf);
// Generate input.
createInput(fileSystem, numMappers, numLines);
// Run the test.
Configuration jobConf = mrCluster.getConfig();
jobConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
runMergeTest(new JobConf(jobConf), fileSystem,
numMappers, numReducers, numLines, percent);
} finally {
if (dfsCluster != null) {
dfsCluster.shutdown();
}
if (mrCluster != null) {
mrCluster.stop();
}
}
}
private void createInput(FileSystem fs, int numMappers, int numLines)
throws Exception {
fs.delete(INPUT_DIR, true);
for (int i = 0; i < numMappers; i++) {
OutputStream os = fs.create(new Path(INPUT_DIR, "input_" + i + ".txt"));
Writer writer = new OutputStreamWriter(os);
for (int j = 0; j < numLines; j++) {
// Create sorted key, value pairs.
int k = j + 1;
String formattedNumber = String.format("%09d", k);
writer.write(formattedNumber + " " + formattedNumber + "\n");
}
writer.close();
}
}
private void runMergeTest(JobConf job, FileSystem fileSystem, int
numMappers, int numReducers, int numLines, int percent)
throws Exception {
fileSystem.delete(OUTPUT, true);
job.setJobName("Test");
JobClient client = new JobClient(job);
RunningJob submittedJob = null;
FileInputFormat.setInputPaths(job, INPUT_DIR);
FileOutputFormat.setOutputPath(job, OUTPUT);
job.set("mapreduce.output.textoutputformat.separator", " ");
job.setInputFormat(TextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MyMapper.class);
job.setPartitionerClass(MyPartitioner.class);
job.setOutputFormat(TextOutputFormat.class);
job.setNumReduceTasks(numReducers);
// All OPPORTUNISTIC
job.setInt(MRJobConfig.MR_NUM_OPPORTUNISTIC_MAPS_PER_100, percent);
job.setInt("mapreduce.map.maxattempts", 1);
job.setInt("mapreduce.reduce.maxattempts", 1);
job.setInt("mapred.test.num_lines", numLines);
job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
try {
submittedJob = client.submitJob(job);
try {
if (!client.monitorAndPrintJob(job, submittedJob)) {
throw new IOException("Job failed!");
}
} catch(InterruptedException ie) {
Thread.currentThread().interrupt();
}
} catch(IOException ioe) {
System.err.println("Job failed with: " + ioe);
} finally {
verifyOutput(fileSystem, numMappers, numLines);
}
}
private void verifyOutput(FileSystem fileSystem, int numMappers, int numLines)
throws Exception {
FSDataInputStream dis = null;
long numValidRecords = 0;
long numInvalidRecords = 0;
String prevKeyValue = "000000000";
Path[] fileList =
FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT,
new Utils.OutputFileUtils.OutputFilesFilter()));
for (Path outFile : fileList) {
try {
dis = fileSystem.open(outFile);
String record;
while((record = dis.readLine()) != null) {
// Split the line into key and value.
int blankPos = record.indexOf(" ");
String keyString = record.substring(0, blankPos);
String valueString = record.substring(blankPos+1);
// Check for sorted output and correctness of record.
if (keyString.compareTo(prevKeyValue) >= 0
&& keyString.equals(valueString)) {
prevKeyValue = keyString;
numValidRecords++;
} else {
numInvalidRecords++;
}
}
} finally {
if (dis != null) {
dis.close();
dis = null;
}
}
}
// Make sure we got all input records in the output in sorted order.
assertEquals((long)(numMappers * numLines), numValidRecords);
// Make sure there is no extraneous invalid record.
assertEquals(0, numInvalidRecords);
}
/**
* A mapper implementation that assumes that key text contains valid integers
* in displayable form.
*/
public static class MyMapper extends MapReduceBase
implements Mapper<LongWritable, Text, Text, Text> {
private Text keyText;
private Text valueText;
public MyMapper() {
keyText = new Text();
valueText = new Text();
}
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String record = value.toString();
int blankPos = record.indexOf(" ");
keyText.set(record.substring(0, blankPos));
valueText.set(record.substring(blankPos+1));
output.collect(keyText, valueText);
}
public void close() throws IOException {
}
}
/**
* Partitioner implementation to make sure that output is in total sorted
* order. We basically route key ranges to different reducers such that
* key values monotonically increase with the partition number. For example,
* in a test with 4 reducers, the keys are numbers from 1 to 1000 in the
* form "000000001" to "000001000" in each input file. The keys "000000001"
* to "000000250" are routed to partition 0, "000000251" to "000000500" are
* routed to partition 1.
*/
static class MyPartitioner implements Partitioner<Text, Text> {
private JobConf job;
public MyPartitioner() {
}
public void configure(JobConf jobConf) {
this.job = jobConf;
}
public int getPartition(Text key, Text value, int numPartitions) {
int keyValue = 0;
try {
keyValue = Integer.parseInt(key.toString());
} catch(NumberFormatException nfe) {
keyValue = 0;
}
int partitionNumber = (numPartitions * (Math.max(0, keyValue - 1))) /
job.getInt("mapred.test.num_lines", 10000);
return partitionNumber;
}
}
}