MAPREDUCE-6728. Give fetchers hint when ShuffleHandler rejects a shuffling connection (haibochen via rkanter)

This commit is contained in:
Robert Kanter 2016-10-21 17:46:17 -07:00
parent c473490da0
commit d4725bfcb2
6 changed files with 126 additions and 23 deletions

View File

@ -65,6 +65,11 @@ class Fetcher<K,V> extends Thread {
/* Default read timeout (in milliseconds) */ /* Default read timeout (in milliseconds) */
private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000; private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
// This should be kept in sync with ShuffleHandler.FETCH_RETRY_DELAY.
private static final long FETCH_RETRY_DELAY_DEFAULT = 1000L;
static final int TOO_MANY_REQ_STATUS_CODE = 429;
private static final String FETCH_RETRY_AFTER_HEADER = "Retry-After";
protected final Reporter reporter; protected final Reporter reporter;
private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP, private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
CONNECTION, WRONG_REDUCE} CONNECTION, WRONG_REDUCE}
@ -269,6 +274,13 @@ class Fetcher<K,V> extends Thread {
} else { } else {
input = new DataInputStream(connection.getInputStream()); input = new DataInputStream(connection.getInputStream());
} }
} catch (TryAgainLaterException te) {
LOG.warn("Connection rejected by the host " + te.host +
". Will retry later.");
scheduler.penalize(host, te.backoff);
for (TaskAttemptID left : remaining) {
scheduler.putBackKnownMapOutput(host, left);
}
} catch (IOException ie) { } catch (IOException ie) {
boolean connectExcpt = ie instanceof ConnectException; boolean connectExcpt = ie instanceof ConnectException;
ioErrs.increment(1); ioErrs.increment(1);
@ -427,6 +439,19 @@ class Fetcher<K,V> extends Thread {
throws IOException { throws IOException {
// Validate response code // Validate response code
int rc = connection.getResponseCode(); int rc = connection.getResponseCode();
// See if the shuffleHandler rejected the connection due to too many
// reducer requests. If so, signal fetchers to back off.
if (rc == TOO_MANY_REQ_STATUS_CODE) {
long backoff = connection.getHeaderFieldLong(FETCH_RETRY_AFTER_HEADER,
FETCH_RETRY_DELAY_DEFAULT);
// in case we get a negative backoff from ShuffleHandler
if (backoff < 0) {
backoff = FETCH_RETRY_DELAY_DEFAULT;
LOG.warn("Get a negative backoff value from ShuffleHandler. Setting" +
" it to the default value " + FETCH_RETRY_DELAY_DEFAULT);
}
throw new TryAgainLaterException(backoff, url.getHost());
}
if (rc != HttpURLConnection.HTTP_OK) { if (rc != HttpURLConnection.HTTP_OK) {
throw new IOException( throw new IOException(
"Got invalid response code " + rc + " from " + url + "Got invalid response code " + rc + " from " + url +
@ -728,4 +753,15 @@ class Fetcher<K,V> extends Thread {
} }
} }
} }
private static class TryAgainLaterException extends IOException {
public final long backoff;
public final String host;
public TryAgainLaterException(long backoff, String host) {
super("Too many requests to a map host");
this.backoff = backoff;
this.host = host;
}
}
} }

View File

@ -75,10 +75,6 @@ public class MapHost {
state = State.BUSY; state = State.BUSY;
} }
public synchronized void markPenalized() {
state = State.PENALIZED;
}
public synchronized int getNumKnownMapOutputs() { public synchronized int getNumKnownMapOutputs() {
return maps.size(); return maps.size();
} }

View File

@ -35,6 +35,7 @@ import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed; import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -105,7 +106,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
private final DecimalFormat mbpsFormat = new DecimalFormat("0.00"); private final DecimalFormat mbpsFormat = new DecimalFormat("0.00");
private final boolean reportReadErrorImmediately; private final boolean reportReadErrorImmediately;
private long maxDelay = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY; private long maxPenalty = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY;
private int maxHostFailures; private int maxHostFailures;
public ShuffleSchedulerImpl(JobConf job, TaskStatus status, public ShuffleSchedulerImpl(JobConf job, TaskStatus status,
@ -136,7 +137,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
this.reportReadErrorImmediately = job.getBoolean( this.reportReadErrorImmediately = job.getBoolean(
MRJobConfig.SHUFFLE_NOTIFY_READERROR, true); MRJobConfig.SHUFFLE_NOTIFY_READERROR, true);
this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY, this.maxPenalty = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY); MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
this.maxHostFailures = job.getInt( this.maxHostFailures = job.getInt(
MRJobConfig.MAX_SHUFFLE_FETCH_HOST_FAILURES, MRJobConfig.MAX_SHUFFLE_FETCH_HOST_FAILURES,
@ -252,9 +253,26 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
} }
} }
@VisibleForTesting
synchronized int hostFailureCount(String hostname) {
int failures = 0;
if (hostFailures.containsKey(hostname)) {
failures = hostFailures.get(hostname).get();
}
return failures;
}
@VisibleForTesting
synchronized int fetchFailureCount(TaskAttemptID mapId) {
int failures = 0;
if (failureCounts.containsKey(mapId)) {
failures = failureCounts.get(mapId).get();
}
return failures;
}
public synchronized void copyFailed(TaskAttemptID mapId, MapHost host, public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
boolean readError, boolean connectExcpt) { boolean readError, boolean connectExcpt) {
host.penalize();
int failures = 1; int failures = 1;
if (failureCounts.containsKey(mapId)) { if (failureCounts.containsKey(mapId)) {
IntWritable x = failureCounts.get(mapId); IntWritable x = failureCounts.get(mapId);
@ -290,15 +308,22 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
long delay = (long) (INITIAL_PENALTY * long delay = (long) (INITIAL_PENALTY *
Math.pow(PENALTY_GROWTH_RATE, failures)); Math.pow(PENALTY_GROWTH_RATE, failures));
if (delay > maxDelay) { penalize(host, Math.min(delay, maxPenalty));
delay = maxDelay;
}
penalties.add(new Penalty(host, delay));
failedShuffleCounter.increment(1); failedShuffleCounter.increment(1);
} }
/**
* Ask the shuffle scheduler to penalize a given host for a given amount
* of time before it reassigns a new fetcher to fetch from the host.
* @param host The host to penalize.
* @param delay The time to wait for before retrying
*/
void penalize(MapHost host, long delay) {
host.penalize();
penalties.add(new Penalty(host, delay));
}
public void reportLocalError(IOException ioe) { public void reportLocalError(IOException ioe) {
try { try {
LOG.error("Shuffle failed : local error on this node: " LOG.error("Shuffle failed : local error on this node: "

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskID;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.rules.TestName; import org.junit.rules.TestName;
@ -176,6 +177,27 @@ public class TestFetcher {
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID)); verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
} }
@Test
public void testCopyFromHostConnectionRejected() throws Exception {
when(connection.getResponseCode())
.thenReturn(Fetcher.TOO_MANY_REQ_STATUS_CODE);
Fetcher<Text, Text> fetcher = new FakeFetcher<>(job, id, ss, mm, r, metrics,
except, key, connection);
fetcher.copyFromHost(host);
Assert.assertEquals("No host failure is expected.",
ss.hostFailureCount(host.getHostName()), 0);
Assert.assertEquals("No fetch failure is expected.",
ss.fetchFailureCount(map1ID), 0);
Assert.assertEquals("No fetch failure is expected.",
ss.fetchFailureCount(map2ID), 0);
verify(ss).penalize(eq(host), anyLong());
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
}
@Test @Test
public void testCopyFromHostBogusHeader() throws Exception { public void testCopyFromHostBogusHeader() throws Exception {

View File

@ -92,7 +92,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Cont
import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.fusesource.leveldbjni.JniDBFactory; import org.fusesource.leveldbjni.JniDBFactory;
import org.fusesource.leveldbjni.internal.NativeDB; import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB; import org.iq80.leveldb.DB;
@ -166,6 +165,12 @@ public class ShuffleHandler extends AuxiliaryService {
private static final String DATA_FILE_NAME = "file.out"; private static final String DATA_FILE_NAME = "file.out";
private static final String INDEX_FILE_NAME = "file.out.index"; private static final String INDEX_FILE_NAME = "file.out.index";
public static final HttpResponseStatus TOO_MANY_REQ_STATUS =
new HttpResponseStatus(429, "TOO MANY REQUESTS");
// This should kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
public static final long FETCH_RETRY_DELAY = 1000L;
public static final String RETRY_AFTER_HEADER = "Retry-After";
private int port; private int port;
private ChannelFactory selector; private ChannelFactory selector;
private final ChannelGroup accepted = new DefaultChannelGroup(); private final ChannelGroup accepted = new DefaultChannelGroup();
@ -795,7 +800,6 @@ public class ShuffleHandler extends AuxiliaryService {
} }
class Shuffle extends SimpleChannelUpstreamHandler { class Shuffle extends SimpleChannelUpstreamHandler {
private static final int MAX_WEIGHT = 10 * 1024 * 1024; private static final int MAX_WEIGHT = 10 * 1024 * 1024;
private static final int EXPIRE_AFTER_ACCESS_MINUTES = 5; private static final int EXPIRE_AFTER_ACCESS_MINUTES = 5;
private static final int ALLOWED_CONCURRENCY = 16; private static final int ALLOWED_CONCURRENCY = 16;
@ -875,7 +879,14 @@ public class ShuffleHandler extends AuxiliaryService {
LOG.info(String.format("Current number of shuffle connections (%d) is " + LOG.info(String.format("Current number of shuffle connections (%d) is " +
"greater than or equal to the max allowed shuffle connections (%d)", "greater than or equal to the max allowed shuffle connections (%d)",
accepted.size(), maxShuffleConnections)); accepted.size(), maxShuffleConnections));
evt.getChannel().close();
Map<String, String> headers = new HashMap<String, String>(1);
// notify fetchers to backoff for a while before closing the connection
// if the shuffle connection limit is hit. Fetchers are expected to
// handle this notification gracefully, that is, not treating this as a
// fetch failure.
headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
return; return;
} }
accepted.add(evt.getChannel()); accepted.add(evt.getChannel());
@ -1245,6 +1256,11 @@ public class ShuffleHandler extends AuxiliaryService {
protected void sendError(ChannelHandlerContext ctx, String message, protected void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) { HttpResponseStatus status) {
sendError(ctx, message, status, Collections.<String, String>emptyMap());
}
protected void sendError(ChannelHandlerContext ctx, String msg,
HttpResponseStatus status, Map<String, String> headers) {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
// Put shuffle version into http header // Put shuffle version into http header
@ -1252,8 +1268,11 @@ public class ShuffleHandler extends AuxiliaryService {
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION, response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
for (Map.Entry<String, String> header : headers.entrySet()) {
response.headers().set(header.getKey(), header.getValue());
}
response.setContent( response.setContent(
ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8)); ChannelBuffers.copiedBuffer(msg, CharsetUtil.UTF_8));
// Close the connection as soon as the error message is sent. // Close the connection as soon as the error message is sent.
ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);

View File

@ -36,7 +36,6 @@ import java.io.FileInputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.SocketException;
import java.net.URL; import java.net.URL;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
@ -80,7 +79,6 @@ import org.apache.hadoop.yarn.server.records.Version;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.socket.SocketChannel; import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.AbstractChannel; import org.jboss.netty.channel.AbstractChannel;
@ -609,13 +607,20 @@ public class TestShuffleHandler {
// This connection should be closed because it to above the limit // This connection should be closed because it to above the limit
try { try {
conns[2].getInputStream();
rc = conns[2].getResponseCode(); rc = conns[2].getResponseCode();
Assert.fail("Expected a SocketException"); Assert.assertEquals("Expected a too-many-requests response code",
} catch (SocketException se) { ShuffleHandler.TOO_MANY_REQ_STATUS.getCode(), rc);
long backoff = Long.valueOf(
conns[2].getHeaderField(ShuffleHandler.RETRY_AFTER_HEADER));
Assert.assertTrue("The backoff value cannot be negative.", backoff > 0);
conns[2].getInputStream();
Assert.fail("Expected an IOException");
} catch (IOException ioe) {
LOG.info("Expected - connection should not be open"); LOG.info("Expected - connection should not be open");
} catch (NumberFormatException ne) {
Assert.fail("Expected a numerical value for RETRY_AFTER header field");
} catch (Exception e) { } catch (Exception e) {
Assert.fail("Expected a SocketException"); Assert.fail("Expected a IOException");
} }
shuffleHandler.stop(); shuffleHandler.stop();