MAPREDUCE-5787. Added the ability to keep alive shuffle connections in the MapReduce shuffle-handler. Contributed by Rajesh Balamohan.

svn merge --ignore-ancestry -c 1580062 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1580063 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-03-21 21:44:24 +00:00
parent 8891c7d08b
commit 444af8fac5
4 changed files with 397 additions and 48 deletions

View File

@ -34,6 +34,9 @@ Release 2.4.0 - UNRELEASED
NEW FEATURES NEW FEATURES
MAPREDUCE-5787. Added the ability to keep alive shuffle connections in the
MapReduce shuffle-handler. (Rajesh Balamohan via vinodkv)
IMPROVEMENTS IMPROVEMENTS
MAPREDUCE-5464. Add analogs of the SLOTS_MILLIS counters that jive with the MAPREDUCE-5464. Add analogs of the SLOTS_MILLIS counters that jive with the

View File

@ -362,6 +362,21 @@
</description> </description>
</property> </property>
<property>
<name>mapreduce.shuffle.connection-keep-alive.enable</name>
<value>false</value>
<description>set to true to support keep-alive connections.</description>
</property>
<property>
<name>mapreduce.shuffle.connection-keep-alive.timeout</name>
<value>5</value>
<description>The number of seconds a shuffle client attempts to retain
http connection. Refer "Keep-Alive: timeout=" header in
Http specification
</description>
</property>
<property> <property>
<name>mapreduce.task.timeout</name> <name>mapreduce.task.timeout</name>
<value>600000</value> <value>600000</value>

View File

@ -23,7 +23,6 @@ import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static org.jboss.netty.handler.codec.http.HttpMethod.GET; import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
@ -41,6 +40,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -110,6 +110,7 @@ import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.jboss.netty.handler.ssl.SslHandler; import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.handler.stream.ChunkedWriteHandler; import org.jboss.netty.handler.stream.ChunkedWriteHandler;
import org.jboss.netty.util.CharsetUtil; import org.jboss.netty.util.CharsetUtil;
import org.mortbay.jetty.HttpHeaders;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -156,6 +157,21 @@ public class ShuffleHandler extends AuxiliaryService {
public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port"; public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
public static final int DEFAULT_SHUFFLE_PORT = 13562; public static final int DEFAULT_SHUFFLE_PORT = 13562;
public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED =
"mapreduce.shuffle.connection-keep-alive.enable";
public static final boolean DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED = false;
public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT =
"mapreduce.shuffle.connection-keep-alive.timeout";
public static final int DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT = 5; //seconds
public static final String SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE =
"mapreduce.shuffle.mapoutput-info.meta.cache.size";
public static final int DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE =
1000;
public static final String CONNECTION_CLOSE = "close";
public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY = public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
"mapreduce.shuffle.ssl.file.buffer.size"; "mapreduce.shuffle.ssl.file.buffer.size";
@ -167,6 +183,9 @@ public class ShuffleHandler extends AuxiliaryService {
public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads"; public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads";
// 0 implies Netty default of 2 * number of available processors // 0 implies Netty default of 2 * number of available processors
public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0; public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
boolean connectionKeepAliveEnabled = false;
int connectionKeepAliveTimeOut;
int mapOutputMetaInfoCacheSize;
@Metrics(about="Shuffle output metrics", context="mapred") @Metrics(about="Shuffle output metrics", context="mapred")
static class ShuffleMetrics implements ChannelFutureListener { static class ShuffleMetrics implements ChannelFutureListener {
@ -328,6 +347,15 @@ public class ShuffleHandler extends AuxiliaryService {
sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE); DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
connectionKeepAliveEnabled =
conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED,
DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED);
connectionKeepAliveTimeOut =
Math.max(1, conf.getInt(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT,
DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT));
mapOutputMetaInfoCacheSize =
Math.max(1, conf.getInt(SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE,
DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE));
} }
@Override @Override
@ -459,6 +487,15 @@ public class ShuffleHandler extends AuxiliaryService {
} }
final Map<String,List<String>> q = final Map<String,List<String>> q =
new QueryStringDecoder(request.getUri()).getParameters(); new QueryStringDecoder(request.getUri()).getParameters();
final List<String> keepAliveList = q.get("keepAlive");
boolean keepAliveParam = false;
if (keepAliveList != null && keepAliveList.size() == 1) {
keepAliveParam = Boolean.valueOf(keepAliveList.get(0));
if (LOG.isDebugEnabled()) {
LOG.debug("KeepAliveParam : " + keepAliveList
+ " : " + keepAliveParam);
}
}
final List<String> mapIds = splitMaps(q.get("map")); final List<String> mapIds = splitMaps(q.get("map"));
final List<String> reduceQ = q.get("reduce"); final List<String> reduceQ = q.get("reduce");
final List<String> jobQ = q.get("job"); final List<String> jobQ = q.get("job");
@ -466,7 +503,8 @@ public class ShuffleHandler extends AuxiliaryService {
LOG.debug("RECV: " + request.getUri() + LOG.debug("RECV: " + request.getUri() +
"\n mapId: " + mapIds + "\n mapId: " + mapIds +
"\n reduceId: " + reduceQ + "\n reduceId: " + reduceQ +
"\n jobId: " + jobQ); "\n jobId: " + jobQ +
"\n keepAlive: " + keepAliveParam);
} }
if (mapIds == null || reduceQ == null || jobQ == null) { if (mapIds == null || reduceQ == null || jobQ == null) {
@ -505,27 +543,46 @@ public class ShuffleHandler extends AuxiliaryService {
return; return;
} }
Map<String, MapOutputInfo> mapOutputInfoMap =
new HashMap<String, MapOutputInfo>();
Channel ch = evt.getChannel(); Channel ch = evt.getChannel();
String user = userRsrc.get(jobId);
// $x/$user/appcache/$appId/output/$mapId
// TODO: Once Shuffle is out of NM, this can use MR APIs to convert
// between App and Job
String outputBasePathStr = getBaseLocation(jobId, user);
try {
populateHeaders(mapIds, outputBasePathStr, user, reduceId, request,
response, keepAliveParam, mapOutputInfoMap);
} catch(IOException e) {
ch.write(response);
LOG.error("Shuffle error in populating headers :", e);
String errorMessage = getErrorMessage(e);
sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
return;
}
ch.write(response); ch.write(response);
// TODO refactor the following into the pipeline // TODO refactor the following into the pipeline
ChannelFuture lastMap = null; ChannelFuture lastMap = null;
for (String mapId : mapIds) { for (String mapId : mapIds) {
try { try {
MapOutputInfo info = mapOutputInfoMap.get(mapId);
if (info == null) {
info = getMapOutputInfo(outputBasePathStr, mapId, reduceId, user);
}
lastMap = lastMap =
sendMapOutput(ctx, ch, userRsrc.get(jobId), jobId, mapId, reduceId); sendMapOutput(ctx, ch, user, mapId,
reduceId, info);
if (null == lastMap) { if (null == lastMap) {
sendError(ctx, NOT_FOUND); sendError(ctx, NOT_FOUND);
return; return;
} }
} catch (IOException e) { } catch (IOException e) {
LOG.error("Shuffle error :", e); LOG.error("Shuffle error :", e);
StringBuffer sb = new StringBuffer(e.getMessage()); String errorMessage = getErrorMessage(e);
Throwable t = e; sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
while (t.getCause() != null) {
sb.append(t.getCause().getMessage());
t = t.getCause();
}
sendError(ctx,sb.toString() , INTERNAL_SERVER_ERROR);
return; return;
} }
} }
@ -533,6 +590,99 @@ public class ShuffleHandler extends AuxiliaryService {
lastMap.addListener(ChannelFutureListener.CLOSE); lastMap.addListener(ChannelFutureListener.CLOSE);
} }
private String getErrorMessage(Throwable t) {
StringBuffer sb = new StringBuffer(t.getMessage());
while (t.getCause() != null) {
sb.append(t.getCause().getMessage());
t = t.getCause();
}
return sb.toString();
}
private String getBaseLocation(String jobId, String user) {
final JobID jobID = JobID.forName(jobId);
final ApplicationId appID =
ApplicationId.newInstance(Long.parseLong(jobID.getJtIdentifier()),
jobID.getId());
final String baseStr =
ContainerLocalizer.USERCACHE + "/" + user + "/"
+ ContainerLocalizer.APPCACHE + "/"
+ ConverterUtils.toString(appID) + "/output" + "/";
return baseStr;
}
protected MapOutputInfo getMapOutputInfo(String base, String mapId,
int reduce, String user) throws IOException {
// Index file
Path indexFileName =
lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
IndexRecord info =
indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
Path mapOutputFileName =
lDirAlloc.getLocalPathToRead(base + "/file.out", conf);
if (LOG.isDebugEnabled()) {
LOG.debug(base + " : " + mapOutputFileName + " : " + indexFileName);
}
MapOutputInfo outputInfo = new MapOutputInfo(mapOutputFileName, info);
return outputInfo;
}
protected void populateHeaders(List<String> mapIds, String outputBaseStr,
String user, int reduce, HttpRequest request, HttpResponse response,
boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap)
throws IOException {
long contentLength = 0;
for (String mapId : mapIds) {
String base = outputBaseStr + mapId;
MapOutputInfo outputInfo = getMapOutputInfo(base, mapId, reduce, user);
if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) {
mapOutputInfoMap.put(mapId, outputInfo);
}
// Index file
Path indexFileName =
lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
IndexRecord info =
indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
ShuffleHeader header =
new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
contentLength += info.partLength;
contentLength += dob.getLength();
}
// Now set the response headers.
setResponseHeaders(response, keepAliveParam, contentLength);
}
protected void setResponseHeaders(HttpResponse response,
boolean keepAliveParam, long contentLength) {
if (!connectionKeepAliveEnabled && !keepAliveParam) {
LOG.info("Setting connection close header...");
response.setHeader(HttpHeaders.CONNECTION, CONNECTION_CLOSE);
} else {
response.setHeader(HttpHeaders.CONTENT_LENGTH,
String.valueOf(contentLength));
response.setHeader(HttpHeaders.CONNECTION, HttpHeaders.KEEP_ALIVE);
response.setHeader(HttpHeaders.KEEP_ALIVE, "timeout="
+ connectionKeepAliveTimeOut);
LOG.info("Content Length in shuffle : " + contentLength);
}
}
class MapOutputInfo {
final Path mapOutputFileName;
final IndexRecord indexRecord;
MapOutputInfo(Path mapOutputFileName, IndexRecord indexRecord) {
this.mapOutputFileName = mapOutputFileName;
this.indexRecord = indexRecord;
}
}
protected void verifyRequest(String appid, ChannelHandlerContext ctx, protected void verifyRequest(String appid, ChannelHandlerContext ctx,
HttpRequest request, HttpResponse response, URL requestUri) HttpRequest request, HttpResponse response, URL requestUri)
throws IOException { throws IOException {
@ -575,39 +725,16 @@ public class ShuffleHandler extends AuxiliaryService {
} }
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch,
String user, String jobId, String mapId, int reduce) String user, String mapId, int reduce, MapOutputInfo mapOutputInfo)
throws IOException { throws IOException {
// TODO replace w/ rsrc alloc final IndexRecord info = mapOutputInfo.indexRecord;
// $x/$user/appcache/$appId/output/$mapId
// TODO: Once Shuffle is out of NM, this can use MR APIs to convert between App and Job
JobID jobID = JobID.forName(jobId);
ApplicationId appID = ApplicationId.newInstance(
Long.parseLong(jobID.getJtIdentifier()), jobID.getId());
final String base =
ContainerLocalizer.USERCACHE + "/" + user + "/"
+ ContainerLocalizer.APPCACHE + "/"
+ ConverterUtils.toString(appID) + "/output" + "/" + mapId;
if (LOG.isDebugEnabled()) {
LOG.debug("DEBUG0 " + base);
}
// Index file
Path indexFileName = lDirAlloc.getLocalPathToRead(
base + "/file.out.index", conf);
// Map-output file
Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
base + "/file.out", conf);
if (LOG.isDebugEnabled()) {
LOG.debug("DEBUG1 " + base + " : " + mapOutputFileName + " : "
+ indexFileName);
}
final IndexRecord info =
indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
final ShuffleHeader header = final ShuffleHeader header =
new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce); new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce);
final DataOutputBuffer dob = new DataOutputBuffer(); final DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob); header.write(dob);
ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
final File spillfile = new File(mapOutputFileName.toString()); final File spillfile =
new File(mapOutputInfo.mapOutputFileName.toString());
RandomAccessFile spill; RandomAccessFile spill;
try { try {
spill = SecureIOUtils.openForRandomRead(spillfile, "r", user, null); spill = SecureIOUtils.openForRandomRead(spillfile, "r", user, null);

View File

@ -23,6 +23,8 @@ import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.test.MockitoMaker.make; import static org.apache.hadoop.test.MockitoMaker.make;
import static org.apache.hadoop.test.MockitoMaker.stub; import static org.apache.hadoop.test.MockitoMaker.stub;
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeTrue; import static org.junit.Assume.assumeTrue;
@ -39,6 +41,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.zip.CheckedOutputStream; import java.util.zip.CheckedOutputStream;
import java.util.zip.Checksum; import java.util.zip.Checksum;
@ -69,17 +72,24 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Cont
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpRequest; import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse; import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mortbay.jetty.HttpHeaders;
public class TestShuffleHandler { public class TestShuffleHandler {
static final long MiB = 1024 * 1024; static final long MiB = 1024 * 1024;
private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class); private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class);
/**
* Test the validation of ShuffleHandler's meta-data's serialization and
* de-serialization.
*
* @throws Exception exception
*/
@Test (timeout = 10000) @Test (timeout = 10000)
public void testSerializeMeta() throws Exception { public void testSerializeMeta() throws Exception {
assertEquals(1, ShuffleHandler.deserializeMetaData( assertEquals(1, ShuffleHandler.deserializeMetaData(
@ -90,6 +100,11 @@ public class TestShuffleHandler {
ShuffleHandler.serializeMetaData(8080))); ShuffleHandler.serializeMetaData(8080)));
} }
/**
* Validate shuffle connection and input/output metrics.
*
* @throws Exception exception
*/
@Test (timeout = 10000) @Test (timeout = 10000)
public void testShuffleMetrics() throws Exception { public void testShuffleMetrics() throws Exception {
MetricsSystem ms = new MetricsSystemImpl(); MetricsSystem ms = new MetricsSystemImpl();
@ -120,6 +135,11 @@ public class TestShuffleHandler {
assertGauge("ShuffleConnections", connections, rb); assertGauge("ShuffleConnections", connections, rb);
} }
/**
* Verify client prematurely closing a connection.
*
* @throws Exception exception.
*/
@Test (timeout = 10000) @Test (timeout = 10000)
public void testClientClosesConnection() throws Exception { public void testClientClosesConnection() throws Exception {
final ArrayList<Throwable> failures = new ArrayList<Throwable>(1); final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
@ -130,6 +150,20 @@ public class TestShuffleHandler {
protected Shuffle getShuffle(Configuration conf) { protected Shuffle getShuffle(Configuration conf) {
// replace the shuffle handler with one stubbed for testing // replace the shuffle handler with one stubbed for testing
return new Shuffle(conf) { return new Shuffle(conf) {
@Override
protected MapOutputInfo getMapOutputInfo(String base, String mapId,
int reduce, String user) throws IOException {
return null;
}
@Override
protected void populateHeaders(List<String> mapIds, String jobId,
String user, int reduce, HttpRequest request,
HttpResponse response, boolean keepAliveParam,
Map<String, MapOutputInfo> infoMap) throws IOException {
// Only set response headers and skip everything else
// send some dummy value for content-length
super.setResponseHeaders(response, keepAliveParam, 100);
}
@Override @Override
protected void verifyRequest(String appid, ChannelHandlerContext ctx, protected void verifyRequest(String appid, ChannelHandlerContext ctx,
HttpRequest request, HttpResponse response, URL requestUri) HttpRequest request, HttpResponse response, URL requestUri)
@ -137,7 +171,8 @@ public class TestShuffleHandler {
} }
@Override @Override
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
Channel ch, String user, String jobId, String mapId, int reduce) Channel ch, String user, String mapId, int reduce,
MapOutputInfo info)
throws IOException { throws IOException {
// send a shuffle header and a lot of data down the channel // send a shuffle header and a lot of data down the channel
// to trigger a broken pipe // to trigger a broken pipe
@ -147,7 +182,7 @@ public class TestShuffleHandler {
header.write(dob); header.write(dob);
ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
dob = new DataOutputBuffer(); dob = new DataOutputBuffer();
for (int i=0; i<100000; ++i) { for (int i = 0; i < 100000; ++i) {
header.write(dob); header.write(dob);
} }
return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
@ -187,6 +222,7 @@ public class TestShuffleHandler {
conn.connect(); conn.connect();
DataInputStream input = new DataInputStream(conn.getInputStream()); DataInputStream input = new DataInputStream(conn.getInputStream());
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
Assert.assertEquals("close", conn.getHeaderField(HttpHeaders.CONNECTION));
ShuffleHeader header = new ShuffleHeader(); ShuffleHeader header = new ShuffleHeader();
header.readFields(input); header.readFields(input);
input.close(); input.close();
@ -196,6 +232,147 @@ public class TestShuffleHandler {
failures.size() == 0); failures.size() == 0);
} }
@Test(timeout = 10000)
public void testKeepAlive() throws Exception {
final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
Configuration conf = new Configuration();
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true);
// try setting to -ve keep alive timeout.
conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100);
ShuffleHandler shuffleHandler = new ShuffleHandler() {
@Override
protected Shuffle getShuffle(final Configuration conf) {
// replace the shuffle handler with one stubbed for testing
return new Shuffle(conf) {
@Override
protected MapOutputInfo getMapOutputInfo(String base, String mapId,
int reduce, String user) throws IOException {
return null;
}
@Override
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
HttpRequest request, HttpResponse response, URL requestUri)
throws IOException {
}
@Override
protected void populateHeaders(List<String> mapIds, String jobId,
String user, int reduce, HttpRequest request,
HttpResponse response, boolean keepAliveParam,
Map<String, MapOutputInfo> infoMap) throws IOException {
// Send some dummy data (populate content length details)
ShuffleHeader header =
new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
dob = new DataOutputBuffer();
for (int i = 0; i < 100000; ++i) {
header.write(dob);
}
long contentLength = dob.getLength();
// for testing purpose;
// disable connectinKeepAliveEnabled if keepAliveParam is available
if (keepAliveParam) {
connectionKeepAliveEnabled = false;
}
super.setResponseHeaders(response, keepAliveParam, contentLength);
}
@Override
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
Channel ch, String user, String mapId, int reduce,
MapOutputInfo info) throws IOException {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
// send a shuffle header and a lot of data down the channel
// to trigger a broken pipe
ShuffleHeader header =
new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
dob = new DataOutputBuffer();
for (int i = 0; i < 100000; ++i) {
header.write(dob);
}
return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
}
@Override
protected void sendError(ChannelHandlerContext ctx,
HttpResponseStatus status) {
if (failures.size() == 0) {
failures.add(new Error());
ctx.getChannel().close();
}
}
@Override
protected void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) {
if (failures.size() == 0) {
failures.add(new Error());
ctx.getChannel().close();
}
}
};
}
};
shuffleHandler.init(conf);
shuffleHandler.start();
String shuffleBaseURL = "http://127.0.0.1:"
+ shuffleHandler.getConfig().get(
ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
URL url =
new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&"
+ "map=attempt_12345_1_m_1_0");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
conn.connect();
DataInputStream input = new DataInputStream(conn.getInputStream());
Assert.assertEquals(HttpHeaders.KEEP_ALIVE,
conn.getHeaderField(HttpHeaders.CONNECTION));
Assert.assertEquals("timeout=1",
conn.getHeaderField(HttpHeaders.KEEP_ALIVE));
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
ShuffleHeader header = new ShuffleHeader();
header.readFields(input);
input.close();
// For keepAlive via URL
url =
new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&"
+ "map=attempt_12345_1_m_1_0&keepAlive=true");
conn = (HttpURLConnection) url.openConnection();
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
conn.connect();
input = new DataInputStream(conn.getInputStream());
Assert.assertEquals(HttpHeaders.KEEP_ALIVE,
conn.getHeaderField(HttpHeaders.CONNECTION));
Assert.assertEquals("timeout=1",
conn.getHeaderField(HttpHeaders.KEEP_ALIVE));
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
header = new ShuffleHeader();
header.readFields(input);
input.close();
}
/**
* simulate a reducer that sends an invalid shuffle-header - sometimes a wrong
* header_name and sometimes a wrong version
*
* @throws Exception exception
*/
@Test (timeout = 10000) @Test (timeout = 10000)
public void testIncompatibleShuffleVersion() throws Exception { public void testIncompatibleShuffleVersion() throws Exception {
final int failureNum = 3; final int failureNum = 3;
@ -225,6 +402,11 @@ public class TestShuffleHandler {
shuffleHandler.close(); shuffleHandler.close();
} }
/**
* Validate the limit on number of shuffle connections.
*
* @throws Exception exception
*/
@Test (timeout = 10000) @Test (timeout = 10000)
public void testMaxConnections() throws Exception { public void testMaxConnections() throws Exception {
@ -236,14 +418,29 @@ public class TestShuffleHandler {
protected Shuffle getShuffle(Configuration conf) { protected Shuffle getShuffle(Configuration conf) {
// replace the shuffle handler with one stubbed for testing // replace the shuffle handler with one stubbed for testing
return new Shuffle(conf) { return new Shuffle(conf) {
@Override
protected MapOutputInfo getMapOutputInfo(String base, String mapId,
int reduce, String user) throws IOException {
// Do nothing.
return null;
}
@Override
protected void populateHeaders(List<String> mapIds, String jobId,
String user, int reduce, HttpRequest request,
HttpResponse response, boolean keepAliveParam,
Map<String, MapOutputInfo> infoMap) throws IOException {
// Do nothing.
}
@Override @Override
protected void verifyRequest(String appid, ChannelHandlerContext ctx, protected void verifyRequest(String appid, ChannelHandlerContext ctx,
HttpRequest request, HttpResponse response, URL requestUri) HttpRequest request, HttpResponse response, URL requestUri)
throws IOException { throws IOException {
// Do nothing.
} }
@Override @Override
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
Channel ch, String user, String jobId, String mapId, int reduce) Channel ch, String user, String mapId, int reduce,
MapOutputInfo info)
throws IOException { throws IOException {
// send a shuffle header and a lot of data down the channel // send a shuffle header and a lot of data down the channel
// to trigger a broken pipe // to trigger a broken pipe
@ -309,6 +506,12 @@ public class TestShuffleHandler {
shuffleHandler.stop(); shuffleHandler.stop();
} }
/**
* Validate the ownership of the map-output files being pulled in. The
* local-file-system owner of the file should match the user component in the
*
* @throws Exception exception
*/
@Test(timeout = 100000) @Test(timeout = 100000)
public void testMapFileAccess() throws IOException { public void testMapFileAccess() throws IOException {
// This will run only in NativeIO is enabled as SecureIOUtils need it // This will run only in NativeIO is enabled as SecureIOUtils need it
@ -323,7 +526,7 @@ public class TestShuffleHandler {
TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile(); TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
ApplicationId appId = ApplicationId.newInstance(12345, 1); ApplicationId appId = ApplicationId.newInstance(12345, 1);
System.out.println(appId.toString()); LOG.info(appId.toString());
String appAttemptId = "attempt_12345_1_m_1_0"; String appAttemptId = "attempt_12345_1_m_1_0";
String user = "randomUser"; String user = "randomUser";
String reducerId = "0"; String reducerId = "0";
@ -341,6 +544,7 @@ public class TestShuffleHandler {
protected void verifyRequest(String appid, ChannelHandlerContext ctx, protected void verifyRequest(String appid, ChannelHandlerContext ctx,
HttpRequest request, HttpResponse response, URL requestUri) HttpRequest request, HttpResponse response, URL requestUri)
throws IOException { throws IOException {
// Do nothing.
} }
}; };
@ -393,7 +597,7 @@ public class TestShuffleHandler {
} }
} }
public static void createShuffleHandlerFiles(File logDir, String user, private static void createShuffleHandlerFiles(File logDir, String user,
String appId, String appAttemptId, Configuration conf, String appId, String appAttemptId, Configuration conf,
List<File> fileMap) throws IOException { List<File> fileMap) throws IOException {
String attemptDir = String attemptDir =
@ -412,7 +616,7 @@ public class TestShuffleHandler {
createMapOutputFile(mapOutputFile, conf); createMapOutputFile(mapOutputFile, conf);
} }
public static void private static void
createMapOutputFile(File mapOutputFile, Configuration conf) createMapOutputFile(File mapOutputFile, Configuration conf)
throws IOException { throws IOException {
FileOutputStream out = new FileOutputStream(mapOutputFile); FileOutputStream out = new FileOutputStream(mapOutputFile);
@ -422,7 +626,7 @@ public class TestShuffleHandler {
out.close(); out.close();
} }
public static void createIndexFile(File indexFile, Configuration conf) private static void createIndexFile(File indexFile, Configuration conf)
throws IOException { throws IOException {
if (indexFile.exists()) { if (indexFile.exists()) {
System.out.println("Deleting existing file"); System.out.println("Deleting existing file");