MAPREDUCE-5027. Shuffle does not limit number of outstanding connections (Robert Parker via jeagles)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1453099 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Turner Eagles 2013-03-06 00:07:25 +00:00
parent b7134a90e2
commit ea7aa7f80f
4 changed files with 128 additions and 10 deletions

View File

@ -582,6 +582,9 @@ Release 0.23.7 - UNRELEASED
MAPREDUCE-4989. JSONify DataTables input data for Attempts page (Ravi MAPREDUCE-4989. JSONify DataTables input data for Attempts page (Ravi
Prakash via jlowe) Prakash via jlowe)
MAPREDUCE-5027. Shuffle does not limit number of outstanding connections
(Robert Parker via jeagles)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the

View File

@ -551,6 +551,14 @@
</description> </description>
</property> </property>
<property>
<name>mapreduce.shuffle.max.connections</name>
<value>0</value>
<description>Max allowed connections for the shuffle. Set to 0 (zero)
to indicate no limit on the number of connections.
</description>
</property>
<property> <property>
<name>mapreduce.reduce.markreset.buffer.percent</name> <name>mapreduce.reduce.markreset.buffer.percent</name>
<value>0.0</value> <value>0.0</value>

View File

@ -88,6 +88,7 @@ import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
@ -121,7 +122,7 @@ public class ShuffleHandler extends AbstractService
public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes"; public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes";
public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024; public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
// pattern to identify errors related to the client closing the socket early // pattern to identify errors related to the client closing the socket early
// idea borrowed from Netty SslHandler // idea borrowed from Netty SslHandler
private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile( private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
@ -133,15 +134,15 @@ public class ShuffleHandler extends AbstractService
private final ChannelGroup accepted = new DefaultChannelGroup(); private final ChannelGroup accepted = new DefaultChannelGroup();
protected HttpPipelineFactory pipelineFact; protected HttpPipelineFactory pipelineFact;
private int sslFileBufferSize; private int sslFileBufferSize;
/** /**
* Should the shuffle use posix_fadvise calls to manage the OS cache during * Should the shuffle use posix_fadvise calls to manage the OS cache during
* sendfile * sendfile
*/ */
private boolean manageOsCache; private boolean manageOsCache;
private int readaheadLength; private int readaheadLength;
private int maxShuffleConnections;
private ReadaheadPool readaheadPool = ReadaheadPool.getInstance(); private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
public static final String MAPREDUCE_SHUFFLE_SERVICEID = public static final String MAPREDUCE_SHUFFLE_SERVICEID =
"mapreduce.shuffle"; "mapreduce.shuffle";
@ -159,6 +160,9 @@ public class ShuffleHandler extends AbstractService
public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024; public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
public static final String MAX_SHUFFLE_CONNECTIONS = "mapreduce.shuffle.max.connections";
public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0; // 0 implies no limit
@Metrics(about="Shuffle output metrics", context="mapred") @Metrics(about="Shuffle output metrics", context="mapred")
static class ShuffleMetrics implements ChannelFutureListener { static class ShuffleMetrics implements ChannelFutureListener {
@Metric("Shuffle output in bytes") @Metric("Shuffle output in bytes")
@ -270,6 +274,9 @@ public class ShuffleHandler extends AbstractService
readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES, readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
DEFAULT_SHUFFLE_READAHEAD_BYTES); DEFAULT_SHUFFLE_READAHEAD_BYTES);
maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS,
DEFAULT_MAX_SHUFFLE_CONNECTIONS);
ThreadFactory bossFactory = new ThreadFactoryBuilder() ThreadFactory bossFactory = new ThreadFactoryBuilder()
.setNameFormat("ShuffleHandler Netty Boss #%d") .setNameFormat("ShuffleHandler Netty Boss #%d")
.build(); .build();
@ -399,6 +406,21 @@ public class ShuffleHandler extends AbstractService
return ret; return ret;
} }
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
throws Exception {
if ((maxShuffleConnections > 0) && (accepted.size() >= maxShuffleConnections)) {
LOG.info(String.format("Current number of shuffle connections (%d) is " +
"greater than or equal to the max allowed shuffle connections (%d)",
accepted.size(), maxShuffleConnections));
evt.getChannel().close();
return;
}
accepted.add(evt.getChannel());
super.channelOpen(ctx, evt);
}
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
throws Exception { throws Exception {
@ -620,6 +642,5 @@ public class ShuffleHandler extends AbstractService
sendError(ctx, INTERNAL_SERVER_ERROR); sendError(ctx, INTERNAL_SERVER_ERROR);
} }
} }
} }
} }

View File

@ -24,13 +24,15 @@ import static org.apache.hadoop.test.MockitoMaker.make;
import static org.apache.hadoop.test.MockitoMaker.stub; import static org.apache.hadoop.test.MockitoMaker.stub;
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.SocketException;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader; import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
@ -47,10 +49,13 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class TestShuffleHandler {
static final long MiB = 1024 * 1024;
@Test public void testSerializeMeta() throws Exception { public class TestShuffleHandler {
static final long MiB = 1024 * 1024;
private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class);
@Test (timeout = 10000)
public void testSerializeMeta() throws Exception {
assertEquals(1, ShuffleHandler.deserializeMetaData( assertEquals(1, ShuffleHandler.deserializeMetaData(
ShuffleHandler.serializeMetaData(1))); ShuffleHandler.serializeMetaData(1)));
assertEquals(-1, ShuffleHandler.deserializeMetaData( assertEquals(-1, ShuffleHandler.deserializeMetaData(
@ -59,7 +64,8 @@ public class TestShuffleHandler {
ShuffleHandler.serializeMetaData(8080))); ShuffleHandler.serializeMetaData(8080)));
} }
@Test public void testShuffleMetrics() throws Exception { @Test (timeout = 10000)
public void testShuffleMetrics() throws Exception {
MetricsSystem ms = new MetricsSystemImpl(); MetricsSystem ms = new MetricsSystemImpl();
ShuffleHandler sh = new ShuffleHandler(ms); ShuffleHandler sh = new ShuffleHandler(ms);
ChannelFuture cf = make(stub(ChannelFuture.class). ChannelFuture cf = make(stub(ChannelFuture.class).
@ -88,7 +94,7 @@ public class TestShuffleHandler {
assertGauge("ShuffleConnections", connections, rb); assertGauge("ShuffleConnections", connections, rb);
} }
@Test @Test (timeout = 10000)
public void testClientClosesConnection() throws Exception { public void testClientClosesConnection() throws Exception {
final ArrayList<Throwable> failures = new ArrayList<Throwable>(1); final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -159,4 +165,84 @@ public class TestShuffleHandler {
Assert.assertTrue("sendError called when client closed connection", Assert.assertTrue("sendError called when client closed connection",
failures.size() == 0); failures.size() == 0);
} }
@Test (timeout = 10000)
public void testMaxConnections() throws Exception {
Configuration conf = new Configuration();
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
ShuffleHandler shuffleHandler = new ShuffleHandler() {
@Override
protected Shuffle getShuffle(Configuration conf) {
// replace the shuffle handler with one stubbed for testing
return new Shuffle(conf) {
@Override
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
HttpRequest request, HttpResponse response, URL requestUri)
throws IOException {
}
@Override
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
Channel ch, String user, String jobId, String mapId, int reduce)
throws IOException {
// send a shuffle header and a lot of data down the channel
// to trigger a broken pipe
ShuffleHeader header =
new ShuffleHeader("dummy_header", 5678, 5678, 1);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
dob = new DataOutputBuffer();
for (int i=0; i<100000; ++i) {
header.write(dob);
}
return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
}
};
}
};
shuffleHandler.init(conf);
shuffleHandler.start();
// setup connections
int connAttempts = 3;
HttpURLConnection conns[] = new HttpURLConnection[connAttempts];
for (int i = 0; i < connAttempts; i++) {
String URLstring = "http://127.0.0.1:"
+ shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+ "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_"
+ i + "_0";
URL url = new URL(URLstring);
conns[i] = (HttpURLConnection)url.openConnection();
}
// Try to open numerous connections
for (int i = 0; i < connAttempts; i++) {
conns[i].connect();
}
//Ensure first connections are okay
conns[0].getInputStream();
int rc = conns[0].getResponseCode();
Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
conns[1].getInputStream();
rc = conns[1].getResponseCode();
Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
// This connection should be closed because it to above the limit
try {
conns[2].getInputStream();
rc = conns[2].getResponseCode();
Assert.fail("Expected a SocketException");
} catch (SocketException se) {
LOG.info("Expected - connection should not be open");
} catch (Exception e) {
Assert.fail("Expected a SocketException");
}
shuffleHandler.stop();
}
} }