svn merge -c 1407118 FIXES: MAPREDUCE-4772. Fetch failures can take way too long for a map to be restarted (bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1407126 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1eb5c00016
commit
6b3db6eabd
|
@ -490,6 +490,9 @@ Release 0.23.5 - UNRELEASED
|
||||||
|
|
||||||
MAPREDUCE-4771. KeyFieldBasedPartitioner not partitioning properly when
|
MAPREDUCE-4771. KeyFieldBasedPartitioner not partitioning properly when
|
||||||
configured (jlowe via bobby)
|
configured (jlowe via bobby)
|
||||||
|
|
||||||
|
MAPREDUCE-4772. Fetch failures can take way too long for a map to be
|
||||||
|
restarted (bobby)
|
||||||
|
|
||||||
Release 0.23.4 - UNRELEASED
|
Release 0.23.4 - UNRELEASED
|
||||||
|
|
||||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
|
@ -1409,16 +1410,22 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
|
fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
|
||||||
job.fetchFailuresMapping.put(mapId, fetchFailures);
|
job.fetchFailuresMapping.put(mapId, fetchFailures);
|
||||||
|
|
||||||
//get number of running reduces
|
//get number of shuffling reduces
|
||||||
int runningReduceTasks = 0;
|
int shufflingReduceTasks = 0;
|
||||||
for (TaskId taskId : job.reduceTasks) {
|
for (TaskId taskId : job.reduceTasks) {
|
||||||
if (TaskState.RUNNING.equals(job.tasks.get(taskId).getState())) {
|
Task task = job.tasks.get(taskId);
|
||||||
runningReduceTasks++;
|
if (TaskState.RUNNING.equals(task.getState())) {
|
||||||
|
for(TaskAttempt attempt : task.getAttempts().values()) {
|
||||||
|
if(attempt.getReport().getPhase() == Phase.SHUFFLE) {
|
||||||
|
shufflingReduceTasks++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
float failureRate = runningReduceTasks == 0 ? 1.0f :
|
float failureRate = shufflingReduceTasks == 0 ? 1.0f :
|
||||||
(float) fetchFailures / runningReduceTasks;
|
(float) fetchFailures / shufflingReduceTasks;
|
||||||
// declare faulty if fetch-failures >= max-allowed-failures
|
// declare faulty if fetch-failures >= max-allowed-failures
|
||||||
boolean isMapFaulty =
|
boolean isMapFaulty =
|
||||||
(failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);
|
(failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);
|
||||||
|
|
|
@ -18,14 +18,19 @@
|
||||||
|
|
||||||
package org.apache.hadoop.mapreduce.v2.app;
|
package org.apache.hadoop.mapreduce.v2.app;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.mapreduce.Counters;
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
||||||
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
|
||||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||||
|
@ -37,6 +42,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -254,6 +260,169 @@ public class TestFetchFailure {
|
||||||
events = job.getTaskAttemptCompletionEvents(0, 100);
|
events = job.getTaskAttemptCompletionEvents(0, 100);
|
||||||
Assert.assertEquals("Num completion events not correct", 2, events.length);
|
Assert.assertEquals("Num completion events not correct", 2, events.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFetchFailureMultipleReduces() throws Exception {
|
||||||
|
MRApp app = new MRApp(1, 3, false, this.getClass().getName(), true);
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
// map -> reduce -> fetch-failure -> map retry is incompatible with
|
||||||
|
// sequential, single-task-attempt approach in uber-AM, so disable:
|
||||||
|
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||||
|
Job job = app.submit(conf);
|
||||||
|
app.waitForState(job, JobState.RUNNING);
|
||||||
|
//all maps would be running
|
||||||
|
Assert.assertEquals("Num tasks not correct",
|
||||||
|
4, job.getTasks().size());
|
||||||
|
Iterator<Task> it = job.getTasks().values().iterator();
|
||||||
|
Task mapTask = it.next();
|
||||||
|
Task reduceTask = it.next();
|
||||||
|
Task reduceTask2 = it.next();
|
||||||
|
Task reduceTask3 = it.next();
|
||||||
|
|
||||||
|
//wait for Task state move to RUNNING
|
||||||
|
app.waitForState(mapTask, TaskState.RUNNING);
|
||||||
|
TaskAttempt mapAttempt1 = mapTask.getAttempts().values().iterator().next();
|
||||||
|
app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
|
||||||
|
|
||||||
|
//send the done signal to the map attempt
|
||||||
|
app.getContext().getEventHandler().handle(
|
||||||
|
new TaskAttemptEvent(mapAttempt1.getID(),
|
||||||
|
TaskAttemptEventType.TA_DONE));
|
||||||
|
|
||||||
|
// wait for map success
|
||||||
|
app.waitForState(mapTask, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
|
TaskAttemptCompletionEvent[] events =
|
||||||
|
job.getTaskAttemptCompletionEvents(0, 100);
|
||||||
|
Assert.assertEquals("Num completion events not correct",
|
||||||
|
1, events.length);
|
||||||
|
Assert.assertEquals("Event status not correct",
|
||||||
|
TaskAttemptCompletionEventStatus.SUCCEEDED, events[0].getStatus());
|
||||||
|
|
||||||
|
// wait for reduce to start running
|
||||||
|
app.waitForState(reduceTask, TaskState.RUNNING);
|
||||||
|
app.waitForState(reduceTask2, TaskState.RUNNING);
|
||||||
|
app.waitForState(reduceTask3, TaskState.RUNNING);
|
||||||
|
TaskAttempt reduceAttempt =
|
||||||
|
reduceTask.getAttempts().values().iterator().next();
|
||||||
|
app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
|
||||||
|
|
||||||
|
updateStatus(app, reduceAttempt, Phase.SHUFFLE);
|
||||||
|
|
||||||
|
TaskAttempt reduceAttempt2 =
|
||||||
|
reduceTask2.getAttempts().values().iterator().next();
|
||||||
|
app.waitForState(reduceAttempt2, TaskAttemptState.RUNNING);
|
||||||
|
updateStatus(app, reduceAttempt2, Phase.SHUFFLE);
|
||||||
|
|
||||||
|
TaskAttempt reduceAttempt3 =
|
||||||
|
reduceTask3.getAttempts().values().iterator().next();
|
||||||
|
app.waitForState(reduceAttempt3, TaskAttemptState.RUNNING);
|
||||||
|
updateStatus(app, reduceAttempt3, Phase.SHUFFLE);
|
||||||
|
|
||||||
|
//send 3 fetch failures from reduce to trigger map re execution
|
||||||
|
sendFetchFailure(app, reduceAttempt, mapAttempt1);
|
||||||
|
sendFetchFailure(app, reduceAttempt, mapAttempt1);
|
||||||
|
sendFetchFailure(app, reduceAttempt, mapAttempt1);
|
||||||
|
|
||||||
|
//We should not re-launch the map task yet
|
||||||
|
assertEquals(TaskState.SUCCEEDED, mapTask.getState());
|
||||||
|
updateStatus(app, reduceAttempt2, Phase.REDUCE);
|
||||||
|
updateStatus(app, reduceAttempt3, Phase.REDUCE);
|
||||||
|
|
||||||
|
sendFetchFailure(app, reduceAttempt, mapAttempt1);
|
||||||
|
|
||||||
|
//wait for map Task state move back to RUNNING
|
||||||
|
app.waitForState(mapTask, TaskState.RUNNING);
|
||||||
|
|
||||||
|
//map attempt must have become FAILED
|
||||||
|
Assert.assertEquals("Map TaskAttempt state not correct",
|
||||||
|
TaskAttemptState.FAILED, mapAttempt1.getState());
|
||||||
|
|
||||||
|
Assert.assertEquals("Num attempts in Map Task not correct",
|
||||||
|
2, mapTask.getAttempts().size());
|
||||||
|
|
||||||
|
Iterator<TaskAttempt> atIt = mapTask.getAttempts().values().iterator();
|
||||||
|
atIt.next();
|
||||||
|
TaskAttempt mapAttempt2 = atIt.next();
|
||||||
|
|
||||||
|
app.waitForState(mapAttempt2, TaskAttemptState.RUNNING);
|
||||||
|
//send the done signal to the second map attempt
|
||||||
|
app.getContext().getEventHandler().handle(
|
||||||
|
new TaskAttemptEvent(mapAttempt2.getID(),
|
||||||
|
TaskAttemptEventType.TA_DONE));
|
||||||
|
|
||||||
|
// wait for map success
|
||||||
|
app.waitForState(mapTask, TaskState.SUCCEEDED);
|
||||||
|
|
||||||
|
//send done to reduce
|
||||||
|
app.getContext().getEventHandler().handle(
|
||||||
|
new TaskAttemptEvent(reduceAttempt.getID(),
|
||||||
|
TaskAttemptEventType.TA_DONE));
|
||||||
|
|
||||||
|
//send done to reduce
|
||||||
|
app.getContext().getEventHandler().handle(
|
||||||
|
new TaskAttemptEvent(reduceAttempt2.getID(),
|
||||||
|
TaskAttemptEventType.TA_DONE));
|
||||||
|
|
||||||
|
//send done to reduce
|
||||||
|
app.getContext().getEventHandler().handle(
|
||||||
|
new TaskAttemptEvent(reduceAttempt3.getID(),
|
||||||
|
TaskAttemptEventType.TA_DONE));
|
||||||
|
|
||||||
|
app.waitForState(job, JobState.SUCCEEDED);
|
||||||
|
|
||||||
|
//previous completion event now becomes obsolete
|
||||||
|
Assert.assertEquals("Event status not correct",
|
||||||
|
TaskAttemptCompletionEventStatus.OBSOLETE, events[0].getStatus());
|
||||||
|
|
||||||
|
events = job.getTaskAttemptCompletionEvents(0, 100);
|
||||||
|
Assert.assertEquals("Num completion events not correct",
|
||||||
|
6, events.length);
|
||||||
|
Assert.assertEquals("Event map attempt id not correct",
|
||||||
|
mapAttempt1.getID(), events[0].getAttemptId());
|
||||||
|
Assert.assertEquals("Event map attempt id not correct",
|
||||||
|
mapAttempt1.getID(), events[1].getAttemptId());
|
||||||
|
Assert.assertEquals("Event map attempt id not correct",
|
||||||
|
mapAttempt2.getID(), events[2].getAttemptId());
|
||||||
|
Assert.assertEquals("Event reduce attempt id not correct",
|
||||||
|
reduceAttempt.getID(), events[3].getAttemptId());
|
||||||
|
Assert.assertEquals("Event status not correct for map attempt1",
|
||||||
|
TaskAttemptCompletionEventStatus.OBSOLETE, events[0].getStatus());
|
||||||
|
Assert.assertEquals("Event status not correct for map attempt1",
|
||||||
|
TaskAttemptCompletionEventStatus.FAILED, events[1].getStatus());
|
||||||
|
Assert.assertEquals("Event status not correct for map attempt2",
|
||||||
|
TaskAttemptCompletionEventStatus.SUCCEEDED, events[2].getStatus());
|
||||||
|
Assert.assertEquals("Event status not correct for reduce attempt1",
|
||||||
|
TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
|
||||||
|
|
||||||
|
TaskAttemptCompletionEvent mapEvents[] =
|
||||||
|
job.getMapAttemptCompletionEvents(0, 2);
|
||||||
|
Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length);
|
||||||
|
Assert.assertArrayEquals("Unexpected map events",
|
||||||
|
Arrays.copyOfRange(events, 0, 2), mapEvents);
|
||||||
|
mapEvents = job.getMapAttemptCompletionEvents(2, 200);
|
||||||
|
Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length);
|
||||||
|
Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void updateStatus(MRApp app, TaskAttempt attempt, Phase phase) {
|
||||||
|
TaskAttemptStatusUpdateEvent.TaskAttemptStatus status = new TaskAttemptStatusUpdateEvent.TaskAttemptStatus();
|
||||||
|
status.counters = new Counters();
|
||||||
|
status.fetchFailedMaps = new ArrayList<TaskAttemptId>();
|
||||||
|
status.id = attempt.getID();
|
||||||
|
status.mapFinishTime = 0;
|
||||||
|
status.outputSize = 0;
|
||||||
|
status.phase = phase;
|
||||||
|
status.progress = 0.5f;
|
||||||
|
status.shuffleFinishTime = 0;
|
||||||
|
status.sortFinishTime = 0;
|
||||||
|
status.stateString = "OK";
|
||||||
|
status.taskState = attempt.getState();
|
||||||
|
TaskAttemptStatusUpdateEvent event = new TaskAttemptStatusUpdateEvent(attempt.getID(),
|
||||||
|
status);
|
||||||
|
app.getContext().getEventHandler().handle(event);
|
||||||
|
}
|
||||||
|
|
||||||
private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt,
|
private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt,
|
||||||
TaskAttempt mapAttempt) {
|
TaskAttempt mapAttempt) {
|
||||||
|
|
|
@ -262,6 +262,9 @@ public interface MRJobConfig {
|
||||||
public static final String SHUFFLE_FETCH_FAILURES = "mapreduce.reduce.shuffle.maxfetchfailures";
|
public static final String SHUFFLE_FETCH_FAILURES = "mapreduce.reduce.shuffle.maxfetchfailures";
|
||||||
|
|
||||||
public static final String SHUFFLE_NOTIFY_READERROR = "mapreduce.reduce.shuffle.notify.readerror";
|
public static final String SHUFFLE_NOTIFY_READERROR = "mapreduce.reduce.shuffle.notify.readerror";
|
||||||
|
|
||||||
|
public static final String MAX_SHUFFLE_FETCH_RETRY_DELAY = "mapreduce.reduce.shuffle.retry-delay.max.ms";
|
||||||
|
public static final long DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY = 60000;
|
||||||
|
|
||||||
public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr";
|
public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr";
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.io.DataInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.net.ConnectException;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
@ -283,6 +284,7 @@ class Fetcher<K,V> extends Thread {
|
||||||
SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
|
SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
|
||||||
LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
|
LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
|
boolean connectExcpt = ie instanceof ConnectException;
|
||||||
ioErrs.increment(1);
|
ioErrs.increment(1);
|
||||||
LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
|
LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
|
||||||
" map outputs", ie);
|
" map outputs", ie);
|
||||||
|
@ -291,14 +293,14 @@ class Fetcher<K,V> extends Thread {
|
||||||
// indirectly penalizing the host
|
// indirectly penalizing the host
|
||||||
if (!connectSucceeded) {
|
if (!connectSucceeded) {
|
||||||
for(TaskAttemptID left: remaining) {
|
for(TaskAttemptID left: remaining) {
|
||||||
scheduler.copyFailed(left, host, connectSucceeded);
|
scheduler.copyFailed(left, host, connectSucceeded, connectExcpt);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// If we got a read error at this stage, it implies there was a problem
|
// If we got a read error at this stage, it implies there was a problem
|
||||||
// with the first map, typically lost map. So, penalize only that map
|
// with the first map, typically lost map. So, penalize only that map
|
||||||
// and add the rest
|
// and add the rest
|
||||||
TaskAttemptID firstMap = maps.get(0);
|
TaskAttemptID firstMap = maps.get(0);
|
||||||
scheduler.copyFailed(firstMap, host, connectSucceeded);
|
scheduler.copyFailed(firstMap, host, connectSucceeded, connectExcpt);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add back all the remaining maps, WITHOUT marking them as failed
|
// Add back all the remaining maps, WITHOUT marking them as failed
|
||||||
|
@ -322,7 +324,7 @@ class Fetcher<K,V> extends Thread {
|
||||||
if(failedTasks != null && failedTasks.length > 0) {
|
if(failedTasks != null && failedTasks.length > 0) {
|
||||||
LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
|
LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
|
||||||
for(TaskAttemptID left: failedTasks) {
|
for(TaskAttemptID left: failedTasks) {
|
||||||
scheduler.copyFailed(left, host, true);
|
scheduler.copyFailed(left, host, true, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -89,6 +89,7 @@ class ShuffleScheduler<K,V> {
|
||||||
private DecimalFormat mbpsFormat = new DecimalFormat("0.00");
|
private DecimalFormat mbpsFormat = new DecimalFormat("0.00");
|
||||||
|
|
||||||
private boolean reportReadErrorImmediately = true;
|
private boolean reportReadErrorImmediately = true;
|
||||||
|
private long maxDelay = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY;
|
||||||
|
|
||||||
public ShuffleScheduler(JobConf job, TaskStatus status,
|
public ShuffleScheduler(JobConf job, TaskStatus status,
|
||||||
ExceptionReporter reporter,
|
ExceptionReporter reporter,
|
||||||
|
@ -115,6 +116,9 @@ class ShuffleScheduler<K,V> {
|
||||||
MRJobConfig.SHUFFLE_FETCH_FAILURES, REPORT_FAILURE_LIMIT);
|
MRJobConfig.SHUFFLE_FETCH_FAILURES, REPORT_FAILURE_LIMIT);
|
||||||
this.reportReadErrorImmediately = job.getBoolean(
|
this.reportReadErrorImmediately = job.getBoolean(
|
||||||
MRJobConfig.SHUFFLE_NOTIFY_READERROR, true);
|
MRJobConfig.SHUFFLE_NOTIFY_READERROR, true);
|
||||||
|
|
||||||
|
this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
|
||||||
|
MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void copySucceeded(TaskAttemptID mapId,
|
public synchronized void copySucceeded(TaskAttemptID mapId,
|
||||||
|
@ -159,7 +163,7 @@ class ShuffleScheduler<K,V> {
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
|
public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
|
||||||
boolean readError) {
|
boolean readError, boolean connectExcpt) {
|
||||||
host.penalize();
|
host.penalize();
|
||||||
int failures = 1;
|
int failures = 1;
|
||||||
if (failureCounts.containsKey(mapId)) {
|
if (failureCounts.containsKey(mapId)) {
|
||||||
|
@ -184,12 +188,15 @@ class ShuffleScheduler<K,V> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
checkAndInformJobTracker(failures, mapId, readError);
|
checkAndInformJobTracker(failures, mapId, readError, connectExcpt);
|
||||||
|
|
||||||
checkReducerHealth();
|
checkReducerHealth();
|
||||||
|
|
||||||
long delay = (long) (INITIAL_PENALTY *
|
long delay = (long) (INITIAL_PENALTY *
|
||||||
Math.pow(PENALTY_GROWTH_RATE, failures));
|
Math.pow(PENALTY_GROWTH_RATE, failures));
|
||||||
|
if (delay > maxDelay) {
|
||||||
|
delay = maxDelay;
|
||||||
|
}
|
||||||
|
|
||||||
penalties.add(new Penalty(host, delay));
|
penalties.add(new Penalty(host, delay));
|
||||||
|
|
||||||
|
@ -200,8 +207,9 @@ class ShuffleScheduler<K,V> {
|
||||||
// after every read error, if 'reportReadErrorImmediately' is true or
|
// after every read error, if 'reportReadErrorImmediately' is true or
|
||||||
// after every 'maxFetchFailuresBeforeReporting' failures
|
// after every 'maxFetchFailuresBeforeReporting' failures
|
||||||
private void checkAndInformJobTracker(
|
private void checkAndInformJobTracker(
|
||||||
int failures, TaskAttemptID mapId, boolean readError) {
|
int failures, TaskAttemptID mapId, boolean readError,
|
||||||
if ((reportReadErrorImmediately && readError)
|
boolean connectExcpt) {
|
||||||
|
if (connectExcpt || (reportReadErrorImmediately && readError)
|
||||||
|| ((failures % maxFetchFailuresBeforeReporting) == 0)) {
|
|| ((failures % maxFetchFailuresBeforeReporting) == 0)) {
|
||||||
LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
|
LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
|
||||||
status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) mapId);
|
status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) mapId);
|
||||||
|
|
|
@ -321,6 +321,14 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>mapreduce.reduce.shuffle.retry-delay.max.ms</name>
|
||||||
|
<value>60000</value>
|
||||||
|
<description>The maximum number of ms the reducer will delay before retrying
|
||||||
|
to download map data.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>mapreduce.reduce.shuffle.parallelcopies</name>
|
<name>mapreduce.reduce.shuffle.parallelcopies</name>
|
||||||
<value>5</value>
|
<value>5</value>
|
||||||
|
|
|
@ -118,8 +118,8 @@ public class TestFetcher {
|
||||||
encHash);
|
encHash);
|
||||||
|
|
||||||
verify(allErrs).increment(1);
|
verify(allErrs).increment(1);
|
||||||
verify(ss).copyFailed(map1ID, host, true);
|
verify(ss).copyFailed(map1ID, host, true, false);
|
||||||
verify(ss).copyFailed(map2ID, host, true);
|
verify(ss).copyFailed(map2ID, host, true, false);
|
||||||
|
|
||||||
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
|
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
|
||||||
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
|
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
|
||||||
|
@ -178,8 +178,8 @@ public class TestFetcher {
|
||||||
.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
|
.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
|
||||||
encHash);
|
encHash);
|
||||||
verify(allErrs, never()).increment(1);
|
verify(allErrs, never()).increment(1);
|
||||||
verify(ss, never()).copyFailed(map1ID, host, true);
|
verify(ss, never()).copyFailed(map1ID, host, true, false);
|
||||||
verify(ss, never()).copyFailed(map2ID, host, true);
|
verify(ss, never()).copyFailed(map2ID, host, true, false);
|
||||||
|
|
||||||
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
|
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
|
||||||
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
|
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
|
||||||
|
|
Loading…
Reference in New Issue