diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 4ca16781888..e60dd00ad05 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -173,6 +173,9 @@ Release 2.4.0 - UNRELEASED
NEW FEATURES
+ MAPREDUCE-5787. Added the ability to keep alive shuffle connections in the
+ MapReduce shuffle-handler. (Rajesh Balamohan via vinodkv)
+
IMPROVEMENTS
MAPREDUCE-5464. Add analogs of the SLOTS_MILLIS counters that jive with the
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 50708b39334..a849347fa69 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -151,6 +151,21 @@
+
+ mapreduce.shuffle.connection-keep-alive.enable
+ false
+ set to true to support keep-alive connections.
+
+
+
+ mapreduce.shuffle.connection-keep-alive.timeout
+ 5
+ The number of seconds a shuffle client attempts to retain
+ http connection. Refer "Keep-Alive: timeout=" header in
+ Http specification
+
+
+
mapreduce.task.timeout
600000
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
index 9a192463b8e..6c0a74f6988 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
@@ -23,7 +23,6 @@ import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
@@ -41,6 +40,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -110,6 +110,7 @@ import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
import org.jboss.netty.util.CharsetUtil;
+import org.mortbay.jetty.HttpHeaders;
import com.google.common.base.Charsets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -156,6 +157,21 @@ public class ShuffleHandler extends AuxiliaryService {
public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
public static final int DEFAULT_SHUFFLE_PORT = 13562;
+ public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED =
+ "mapreduce.shuffle.connection-keep-alive.enable";
+ public static final boolean DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED = false;
+
+ public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT =
+ "mapreduce.shuffle.connection-keep-alive.timeout";
+ public static final int DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT = 5; //seconds
+
+ public static final String SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE =
+ "mapreduce.shuffle.mapoutput-info.meta.cache.size";
+ public static final int DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE =
+ 1000;
+
+ public static final String CONNECTION_CLOSE = "close";
+
public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
"mapreduce.shuffle.ssl.file.buffer.size";
@@ -167,6 +183,9 @@ public class ShuffleHandler extends AuxiliaryService {
public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads";
// 0 implies Netty default of 2 * number of available processors
public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
+ boolean connectionKeepAliveEnabled = false;
+ int connectionKeepAliveTimeOut;
+ int mapOutputMetaInfoCacheSize;
@Metrics(about="Shuffle output metrics", context="mapred")
static class ShuffleMetrics implements ChannelFutureListener {
@@ -328,6 +347,15 @@ public class ShuffleHandler extends AuxiliaryService {
sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+ connectionKeepAliveEnabled =
+ conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED,
+ DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED);
+ connectionKeepAliveTimeOut =
+ Math.max(1, conf.getInt(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT,
+ DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT));
+ mapOutputMetaInfoCacheSize =
+ Math.max(1, conf.getInt(SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE,
+ DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE));
}
@Override
@@ -459,6 +487,15 @@ public class ShuffleHandler extends AuxiliaryService {
}
final Map> q =
new QueryStringDecoder(request.getUri()).getParameters();
+ final List keepAliveList = q.get("keepAlive");
+ boolean keepAliveParam = false;
+ if (keepAliveList != null && keepAliveList.size() == 1) {
+ keepAliveParam = Boolean.valueOf(keepAliveList.get(0));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("KeepAliveParam : " + keepAliveList
+ + " : " + keepAliveParam);
+ }
+ }
final List mapIds = splitMaps(q.get("map"));
final List reduceQ = q.get("reduce");
final List jobQ = q.get("job");
@@ -466,7 +503,8 @@ public class ShuffleHandler extends AuxiliaryService {
LOG.debug("RECV: " + request.getUri() +
"\n mapId: " + mapIds +
"\n reduceId: " + reduceQ +
- "\n jobId: " + jobQ);
+ "\n jobId: " + jobQ +
+ "\n keepAlive: " + keepAliveParam);
}
if (mapIds == null || reduceQ == null || jobQ == null) {
@@ -505,27 +543,46 @@ public class ShuffleHandler extends AuxiliaryService {
return;
}
+ Map mapOutputInfoMap =
+ new HashMap();
Channel ch = evt.getChannel();
+ String user = userRsrc.get(jobId);
+
+ // $x/$user/appcache/$appId/output/$mapId
+ // TODO: Once Shuffle is out of NM, this can use MR APIs to convert
+ // between App and Job
+ String outputBasePathStr = getBaseLocation(jobId, user);
+
+ try {
+ populateHeaders(mapIds, outputBasePathStr, user, reduceId, request,
+ response, keepAliveParam, mapOutputInfoMap);
+ } catch(IOException e) {
+ ch.write(response);
+ LOG.error("Shuffle error in populating headers :", e);
+ String errorMessage = getErrorMessage(e);
+ sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
+ return;
+ }
ch.write(response);
// TODO refactor the following into the pipeline
ChannelFuture lastMap = null;
for (String mapId : mapIds) {
try {
+ MapOutputInfo info = mapOutputInfoMap.get(mapId);
+ if (info == null) {
+ info = getMapOutputInfo(outputBasePathStr, mapId, reduceId, user);
+ }
lastMap =
- sendMapOutput(ctx, ch, userRsrc.get(jobId), jobId, mapId, reduceId);
+ sendMapOutput(ctx, ch, user, mapId,
+ reduceId, info);
if (null == lastMap) {
sendError(ctx, NOT_FOUND);
return;
}
} catch (IOException e) {
LOG.error("Shuffle error :", e);
- StringBuffer sb = new StringBuffer(e.getMessage());
- Throwable t = e;
- while (t.getCause() != null) {
- sb.append(t.getCause().getMessage());
- t = t.getCause();
- }
- sendError(ctx,sb.toString() , INTERNAL_SERVER_ERROR);
+ String errorMessage = getErrorMessage(e);
+ sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
return;
}
}
@@ -533,6 +590,99 @@ public class ShuffleHandler extends AuxiliaryService {
lastMap.addListener(ChannelFutureListener.CLOSE);
}
+ private String getErrorMessage(Throwable t) {
+ StringBuffer sb = new StringBuffer(t.getMessage());
+ while (t.getCause() != null) {
+ sb.append(t.getCause().getMessage());
+ t = t.getCause();
+ }
+ return sb.toString();
+ }
+
+ private String getBaseLocation(String jobId, String user) {
+ final JobID jobID = JobID.forName(jobId);
+ final ApplicationId appID =
+ ApplicationId.newInstance(Long.parseLong(jobID.getJtIdentifier()),
+ jobID.getId());
+ final String baseStr =
+ ContainerLocalizer.USERCACHE + "/" + user + "/"
+ + ContainerLocalizer.APPCACHE + "/"
+ + ConverterUtils.toString(appID) + "/output" + "/";
+ return baseStr;
+ }
+
+ protected MapOutputInfo getMapOutputInfo(String base, String mapId,
+ int reduce, String user) throws IOException {
+ // Index file
+ Path indexFileName =
+ lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
+ IndexRecord info =
+ indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
+
+ Path mapOutputFileName =
+ lDirAlloc.getLocalPathToRead(base + "/file.out", conf);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(base + " : " + mapOutputFileName + " : " + indexFileName);
+ }
+ MapOutputInfo outputInfo = new MapOutputInfo(mapOutputFileName, info);
+ return outputInfo;
+ }
+
+ protected void populateHeaders(List mapIds, String outputBaseStr,
+ String user, int reduce, HttpRequest request, HttpResponse response,
+ boolean keepAliveParam, Map mapOutputInfoMap)
+ throws IOException {
+
+ long contentLength = 0;
+ for (String mapId : mapIds) {
+ String base = outputBaseStr + mapId;
+ MapOutputInfo outputInfo = getMapOutputInfo(base, mapId, reduce, user);
+ if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) {
+ mapOutputInfoMap.put(mapId, outputInfo);
+ }
+ // Index file
+ Path indexFileName =
+ lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
+ IndexRecord info =
+ indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
+ ShuffleHeader header =
+ new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce);
+ DataOutputBuffer dob = new DataOutputBuffer();
+ header.write(dob);
+
+ contentLength += info.partLength;
+ contentLength += dob.getLength();
+ }
+
+ // Now set the response headers.
+ setResponseHeaders(response, keepAliveParam, contentLength);
+ }
+
+ protected void setResponseHeaders(HttpResponse response,
+ boolean keepAliveParam, long contentLength) {
+ if (!connectionKeepAliveEnabled && !keepAliveParam) {
+ LOG.info("Setting connection close header...");
+ response.setHeader(HttpHeaders.CONNECTION, CONNECTION_CLOSE);
+ } else {
+ response.setHeader(HttpHeaders.CONTENT_LENGTH,
+ String.valueOf(contentLength));
+ response.setHeader(HttpHeaders.CONNECTION, HttpHeaders.KEEP_ALIVE);
+ response.setHeader(HttpHeaders.KEEP_ALIVE, "timeout="
+ + connectionKeepAliveTimeOut);
+ LOG.info("Content Length in shuffle : " + contentLength);
+ }
+ }
+
+ class MapOutputInfo {
+ final Path mapOutputFileName;
+ final IndexRecord indexRecord;
+
+ MapOutputInfo(Path mapOutputFileName, IndexRecord indexRecord) {
+ this.mapOutputFileName = mapOutputFileName;
+ this.indexRecord = indexRecord;
+ }
+ }
+
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
HttpRequest request, HttpResponse response, URL requestUri)
throws IOException {
@@ -575,39 +725,16 @@ public class ShuffleHandler extends AuxiliaryService {
}
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch,
- String user, String jobId, String mapId, int reduce)
+ String user, String mapId, int reduce, MapOutputInfo mapOutputInfo)
throws IOException {
- // TODO replace w/ rsrc alloc
- // $x/$user/appcache/$appId/output/$mapId
- // TODO: Once Shuffle is out of NM, this can use MR APIs to convert between App and Job
- JobID jobID = JobID.forName(jobId);
- ApplicationId appID = ApplicationId.newInstance(
- Long.parseLong(jobID.getJtIdentifier()), jobID.getId());
- final String base =
- ContainerLocalizer.USERCACHE + "/" + user + "/"
- + ContainerLocalizer.APPCACHE + "/"
- + ConverterUtils.toString(appID) + "/output" + "/" + mapId;
- if (LOG.isDebugEnabled()) {
- LOG.debug("DEBUG0 " + base);
- }
- // Index file
- Path indexFileName = lDirAlloc.getLocalPathToRead(
- base + "/file.out.index", conf);
- // Map-output file
- Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
- base + "/file.out", conf);
- if (LOG.isDebugEnabled()) {
- LOG.debug("DEBUG1 " + base + " : " + mapOutputFileName + " : "
- + indexFileName);
- }
- final IndexRecord info =
- indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
+ final IndexRecord info = mapOutputInfo.indexRecord;
final ShuffleHeader header =
new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce);
final DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
- final File spillfile = new File(mapOutputFileName.toString());
+ final File spillfile =
+ new File(mapOutputInfo.mapOutputFileName.toString());
RandomAccessFile spill;
try {
spill = SecureIOUtils.openForRandomRead(spillfile, "r", user, null);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
index 1b078bdd104..d03dd2ba295 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
@@ -23,6 +23,8 @@ import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.test.MockitoMaker.make;
import static org.apache.hadoop.test.MockitoMaker.stub;
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeTrue;
@@ -39,6 +41,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.zip.CheckedOutputStream;
import java.util.zip.Checksum;
@@ -69,17 +72,24 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Cont
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.Assert;
import org.junit.Test;
-
+import org.mortbay.jetty.HttpHeaders;
public class TestShuffleHandler {
static final long MiB = 1024 * 1024;
private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class);
+ /**
+ * Test the validation of ShuffleHandler's meta-data's serialization and
+ * de-serialization.
+ *
+ * @throws Exception exception
+ */
@Test (timeout = 10000)
public void testSerializeMeta() throws Exception {
assertEquals(1, ShuffleHandler.deserializeMetaData(
@@ -90,6 +100,11 @@ public class TestShuffleHandler {
ShuffleHandler.serializeMetaData(8080)));
}
+ /**
+ * Validate shuffle connection and input/output metrics.
+ *
+ * @throws Exception exception
+ */
@Test (timeout = 10000)
public void testShuffleMetrics() throws Exception {
MetricsSystem ms = new MetricsSystemImpl();
@@ -120,6 +135,11 @@ public class TestShuffleHandler {
assertGauge("ShuffleConnections", connections, rb);
}
+ /**
+ * Verify client prematurely closing a connection.
+ *
+ * @throws Exception exception.
+ */
@Test (timeout = 10000)
public void testClientClosesConnection() throws Exception {
final ArrayList failures = new ArrayList(1);
@@ -130,6 +150,20 @@ public class TestShuffleHandler {
protected Shuffle getShuffle(Configuration conf) {
// replace the shuffle handler with one stubbed for testing
return new Shuffle(conf) {
+ @Override
+ protected MapOutputInfo getMapOutputInfo(String base, String mapId,
+ int reduce, String user) throws IOException {
+ return null;
+ }
+ @Override
+ protected void populateHeaders(List mapIds, String jobId,
+ String user, int reduce, HttpRequest request,
+ HttpResponse response, boolean keepAliveParam,
+ Map infoMap) throws IOException {
+ // Only set response headers and skip everything else
+ // send some dummy value for content-length
+ super.setResponseHeaders(response, keepAliveParam, 100);
+ }
@Override
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
HttpRequest request, HttpResponse response, URL requestUri)
@@ -137,7 +171,8 @@ public class TestShuffleHandler {
}
@Override
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
- Channel ch, String user, String jobId, String mapId, int reduce)
+ Channel ch, String user, String mapId, int reduce,
+ MapOutputInfo info)
throws IOException {
// send a shuffle header and a lot of data down the channel
// to trigger a broken pipe
@@ -147,7 +182,7 @@ public class TestShuffleHandler {
header.write(dob);
ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
dob = new DataOutputBuffer();
- for (int i=0; i<100000; ++i) {
+ for (int i = 0; i < 100000; ++i) {
header.write(dob);
}
return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
@@ -187,6 +222,7 @@ public class TestShuffleHandler {
conn.connect();
DataInputStream input = new DataInputStream(conn.getInputStream());
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+ Assert.assertEquals("close", conn.getHeaderField(HttpHeaders.CONNECTION));
ShuffleHeader header = new ShuffleHeader();
header.readFields(input);
input.close();
@@ -196,6 +232,147 @@ public class TestShuffleHandler {
failures.size() == 0);
}
+ @Test(timeout = 10000)
+ public void testKeepAlive() throws Exception {
+ final ArrayList failures = new ArrayList(1);
+ Configuration conf = new Configuration();
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true);
+ // try setting to -ve keep alive timeout.
+ conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100);
+ ShuffleHandler shuffleHandler = new ShuffleHandler() {
+ @Override
+ protected Shuffle getShuffle(final Configuration conf) {
+ // replace the shuffle handler with one stubbed for testing
+ return new Shuffle(conf) {
+ @Override
+ protected MapOutputInfo getMapOutputInfo(String base, String mapId,
+ int reduce, String user) throws IOException {
+ return null;
+ }
+ @Override
+ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
+ HttpRequest request, HttpResponse response, URL requestUri)
+ throws IOException {
+ }
+
+ @Override
+ protected void populateHeaders(List mapIds, String jobId,
+ String user, int reduce, HttpRequest request,
+ HttpResponse response, boolean keepAliveParam,
+ Map infoMap) throws IOException {
+ // Send some dummy data (populate content length details)
+ ShuffleHeader header =
+ new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
+ DataOutputBuffer dob = new DataOutputBuffer();
+ header.write(dob);
+ dob = new DataOutputBuffer();
+ for (int i = 0; i < 100000; ++i) {
+ header.write(dob);
+ }
+
+ long contentLength = dob.getLength();
+ // for testing purpose;
+ // disable connectinKeepAliveEnabled if keepAliveParam is available
+ if (keepAliveParam) {
+ connectionKeepAliveEnabled = false;
+ }
+
+ super.setResponseHeaders(response, keepAliveParam, contentLength);
+ }
+
+ @Override
+ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
+ Channel ch, String user, String mapId, int reduce,
+ MapOutputInfo info) throws IOException {
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+
+ // send a shuffle header and a lot of data down the channel
+ // to trigger a broken pipe
+ ShuffleHeader header =
+ new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
+ DataOutputBuffer dob = new DataOutputBuffer();
+ header.write(dob);
+ ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ dob = new DataOutputBuffer();
+ for (int i = 0; i < 100000; ++i) {
+ header.write(dob);
+ }
+ return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ }
+
+ @Override
+ protected void sendError(ChannelHandlerContext ctx,
+ HttpResponseStatus status) {
+ if (failures.size() == 0) {
+ failures.add(new Error());
+ ctx.getChannel().close();
+ }
+ }
+
+ @Override
+ protected void sendError(ChannelHandlerContext ctx, String message,
+ HttpResponseStatus status) {
+ if (failures.size() == 0) {
+ failures.add(new Error());
+ ctx.getChannel().close();
+ }
+ }
+ };
+ }
+ };
+ shuffleHandler.init(conf);
+ shuffleHandler.start();
+
+ String shuffleBaseURL = "http://127.0.0.1:"
+ + shuffleHandler.getConfig().get(
+ ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
+ URL url =
+ new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&"
+ + "map=attempt_12345_1_m_1_0");
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+ conn.connect();
+ DataInputStream input = new DataInputStream(conn.getInputStream());
+ Assert.assertEquals(HttpHeaders.KEEP_ALIVE,
+ conn.getHeaderField(HttpHeaders.CONNECTION));
+ Assert.assertEquals("timeout=1",
+ conn.getHeaderField(HttpHeaders.KEEP_ALIVE));
+ Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+ ShuffleHeader header = new ShuffleHeader();
+ header.readFields(input);
+ input.close();
+
+ // For keepAlive via URL
+ url =
+ new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&"
+ + "map=attempt_12345_1_m_1_0&keepAlive=true");
+ conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+ conn.connect();
+ input = new DataInputStream(conn.getInputStream());
+ Assert.assertEquals(HttpHeaders.KEEP_ALIVE,
+ conn.getHeaderField(HttpHeaders.CONNECTION));
+ Assert.assertEquals("timeout=1",
+ conn.getHeaderField(HttpHeaders.KEEP_ALIVE));
+ Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+ header = new ShuffleHeader();
+ header.readFields(input);
+ input.close();
+ }
+
+ /**
+ * simulate a reducer that sends an invalid shuffle-header - sometimes a wrong
+ * header_name and sometimes a wrong version
+ *
+ * @throws Exception exception
+ */
@Test (timeout = 10000)
public void testIncompatibleShuffleVersion() throws Exception {
final int failureNum = 3;
@@ -224,7 +401,12 @@ public class TestShuffleHandler {
shuffleHandler.stop();
shuffleHandler.close();
}
-
+
+ /**
+ * Validate the limit on number of shuffle connections.
+ *
+ * @throws Exception exception
+ */
@Test (timeout = 10000)
public void testMaxConnections() throws Exception {
@@ -236,14 +418,29 @@ public class TestShuffleHandler {
protected Shuffle getShuffle(Configuration conf) {
// replace the shuffle handler with one stubbed for testing
return new Shuffle(conf) {
+ @Override
+ protected MapOutputInfo getMapOutputInfo(String base, String mapId,
+ int reduce, String user) throws IOException {
+ // Do nothing.
+ return null;
+ }
+ @Override
+ protected void populateHeaders(List mapIds, String jobId,
+ String user, int reduce, HttpRequest request,
+ HttpResponse response, boolean keepAliveParam,
+ Map infoMap) throws IOException {
+ // Do nothing.
+ }
@Override
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
HttpRequest request, HttpResponse response, URL requestUri)
throws IOException {
+ // Do nothing.
}
@Override
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
- Channel ch, String user, String jobId, String mapId, int reduce)
+ Channel ch, String user, String mapId, int reduce,
+ MapOutputInfo info)
throws IOException {
// send a shuffle header and a lot of data down the channel
// to trigger a broken pipe
@@ -308,7 +505,13 @@ public class TestShuffleHandler {
shuffleHandler.stop();
}
-
+
+ /**
+ * Validate the ownership of the map-output files being pulled in. The
+ * local-file-system owner of the file should match the user component in the
+ *
+ * @throws Exception exception
+ */
@Test(timeout = 100000)
public void testMapFileAccess() throws IOException {
// This will run only in NativeIO is enabled as SecureIOUtils need it
@@ -323,7 +526,7 @@ public class TestShuffleHandler {
TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
ApplicationId appId = ApplicationId.newInstance(12345, 1);
- System.out.println(appId.toString());
+ LOG.info(appId.toString());
String appAttemptId = "attempt_12345_1_m_1_0";
String user = "randomUser";
String reducerId = "0";
@@ -341,6 +544,7 @@ public class TestShuffleHandler {
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
HttpRequest request, HttpResponse response, URL requestUri)
throws IOException {
+ // Do nothing.
}
};
@@ -393,7 +597,7 @@ public class TestShuffleHandler {
}
}
- public static void createShuffleHandlerFiles(File logDir, String user,
+ private static void createShuffleHandlerFiles(File logDir, String user,
String appId, String appAttemptId, Configuration conf,
List fileMap) throws IOException {
String attemptDir =
@@ -412,8 +616,8 @@ public class TestShuffleHandler {
createMapOutputFile(mapOutputFile, conf);
}
- public static void
- createMapOutputFile(File mapOutputFile, Configuration conf)
+ private static void
+ createMapOutputFile(File mapOutputFile, Configuration conf)
throws IOException {
FileOutputStream out = new FileOutputStream(mapOutputFile);
out.write("Creating new dummy map output file. Used only for testing"
@@ -422,7 +626,7 @@ public class TestShuffleHandler {
out.close();
}
- public static void createIndexFile(File indexFile, Configuration conf)
+ private static void createIndexFile(File indexFile, Configuration conf)
throws IOException {
if (indexFile.exists()) {
System.out.println("Deleting existing file");