svn merge -c 1410131 FIXES: MAPREDUCE-4801. ShuffleHandler can generate large logs due to prematurely closed channels (jlowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1410132 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2012-11-16 00:55:56 +00:00
parent 99127a2e06
commit 34abf278dd
3 changed files with 126 additions and 11 deletions

View File

@ -534,6 +534,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4797. LocalContainerAllocator can loop forever trying to contact MAPREDUCE-4797. LocalContainerAllocator can loop forever trying to contact
the RM (jlowe via bobby) the RM (jlowe via bobby)
MAPREDUCE-4801. ShuffleHandler can generate large logs due to prematurely
closed channels (jlowe via bobby)
Release 0.23.4 Release 0.23.4

View File

@ -37,6 +37,7 @@ import java.io.RandomAccessFile;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URL; import java.net.URL;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -45,6 +46,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.crypto.SecretKey; import javax.crypto.SecretKey;
@ -120,10 +122,16 @@ public class ShuffleHandler extends AbstractService
public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes"; public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes";
public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024; public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
// pattern to identify errors related to the client closing the socket early
// idea borrowed from Netty SslHandler
private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
"^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
Pattern.CASE_INSENSITIVE);
private int port; private int port;
private ChannelFactory selector; private ChannelFactory selector;
private final ChannelGroup accepted = new DefaultChannelGroup(); private final ChannelGroup accepted = new DefaultChannelGroup();
private HttpPipelineFactory pipelineFact; protected HttpPipelineFactory pipelineFact;
private int sslFileBufferSize; private int sslFileBufferSize;
/** /**
@ -319,13 +327,17 @@ public class ShuffleHandler extends AbstractService
} }
} }
protected Shuffle getShuffle(Configuration conf) {
return new Shuffle(conf);
}
class HttpPipelineFactory implements ChannelPipelineFactory { class HttpPipelineFactory implements ChannelPipelineFactory {
final Shuffle SHUFFLE; final Shuffle SHUFFLE;
private SSLFactory sslFactory; private SSLFactory sslFactory;
public HttpPipelineFactory(Configuration conf) throws Exception { public HttpPipelineFactory(Configuration conf) throws Exception {
SHUFFLE = new Shuffle(conf); SHUFFLE = getShuffle(conf);
if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) { MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf); sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
@ -465,7 +477,7 @@ public class ShuffleHandler extends AbstractService
lastMap.addListener(ChannelFutureListener.CLOSE); lastMap.addListener(ChannelFutureListener.CLOSE);
} }
private void verifyRequest(String appid, ChannelHandlerContext ctx, protected void verifyRequest(String appid, ChannelHandlerContext ctx,
HttpRequest request, HttpResponse response, URL requestUri) HttpRequest request, HttpResponse response, URL requestUri)
throws IOException { throws IOException {
SecretKey tokenSecret = secretManager.retrieveTokenSecret(appid); SecretKey tokenSecret = secretManager.retrieveTokenSecret(appid);
@ -566,12 +578,12 @@ public class ShuffleHandler extends AbstractService
return writeFuture; return writeFuture;
} }
private void sendError(ChannelHandlerContext ctx, protected void sendError(ChannelHandlerContext ctx,
HttpResponseStatus status) { HttpResponseStatus status) {
sendError(ctx, "", status); sendError(ctx, "", status);
} }
private void sendError(ChannelHandlerContext ctx, String message, protected void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) { HttpResponseStatus status) {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
@ -590,6 +602,16 @@ public class ShuffleHandler extends AbstractService
if (cause instanceof TooLongFrameException) { if (cause instanceof TooLongFrameException) {
sendError(ctx, BAD_REQUEST); sendError(ctx, BAD_REQUEST);
return; return;
} else if (cause instanceof IOException) {
if (cause instanceof ClosedChannelException) {
LOG.debug("Ignoring closed channel error", cause);
return;
}
String message = String.valueOf(cause.getMessage());
if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
LOG.debug("Ignoring client socket close", cause);
return;
}
} }
LOG.error("Shuffle error: ", cause); LOG.error("Shuffle error: ", cause);

View File

@ -17,17 +17,35 @@
*/ */
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.test.MockitoMaker.make;
import static org.apache.hadoop.test.MockitoMaker.stub;
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
import static org.junit.Assert.assertEquals;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.MetricsSystem;
import static org.apache.hadoop.test.MetricsAsserts.*; import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*;
import static org.apache.hadoop.test.MockitoMaker.*;
public class TestShuffleHandler { public class TestShuffleHandler {
static final long MiB = 1024 * 1024; static final long MiB = 1024 * 1024;
@ -69,4 +87,76 @@ public class TestShuffleHandler {
assertCounter("ShuffleOutputsOK", succeeded, rb); assertCounter("ShuffleOutputsOK", succeeded, rb);
assertGauge("ShuffleConnections", connections, rb); assertGauge("ShuffleConnections", connections, rb);
} }
@Test
public void testClientClosesConnection() throws Exception {
final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
Configuration conf = new Configuration();
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
ShuffleHandler shuffleHandler = new ShuffleHandler() {
@Override
protected Shuffle getShuffle(Configuration conf) {
// replace the shuffle handler with one stubbed for testing
return new Shuffle(conf) {
@Override
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
HttpRequest request, HttpResponse response, URL requestUri)
throws IOException {
}
@Override
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
Channel ch, String user, String jobId, String mapId, int reduce)
throws IOException {
// send a shuffle header and a lot of data down the channel
// to trigger a broken pipe
ShuffleHeader header =
new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
dob = new DataOutputBuffer();
for (int i=0; i<100000; ++i) {
header.write(dob);
}
return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
}
@Override
protected void sendError(ChannelHandlerContext ctx,
HttpResponseStatus status) {
if (failures.size() == 0) {
failures.add(new Error());
ctx.getChannel().close();
}
}
@Override
protected void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) {
if (failures.size() == 0) {
failures.add(new Error());
ctx.getChannel().close();
}
}
};
}
};
shuffleHandler.init(conf);
shuffleHandler.start();
// simulate a reducer that closes early by reading a single shuffle header
// then closing the connection
URL url = new URL("http://127.0.0.1:"
+ shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+ "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0");
HttpURLConnection conn = (HttpURLConnection)url.openConnection();
conn.connect();
DataInputStream input = new DataInputStream(conn.getInputStream());
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
ShuffleHeader header = new ShuffleHeader();
header.readFields(input);
input.close();
shuffleHandler.stop();
Assert.assertTrue("sendError called when client closed connection",
failures.size() == 0);
}
} }