From ee9b80acf84718b952cbb82380cd0a4dbe22dc79 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 21 Oct 2015 21:43:42 +0000 Subject: [PATCH] MAPREDUCE-6518. Set SO_KEEPALIVE on shuffle connections. Contributed by Chang Li --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../apache/hadoop/mapred/ShuffleHandler.java | 1 + .../hadoop/mapred/TestShuffleHandler.java | 59 +++++++++++++++++++ 3 files changed, 63 insertions(+) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 62807af57c3..c29a92c4273 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -650,6 +650,9 @@ Release 2.7.2 - UNRELEASED TaskAttemptImpl#sendJHStartEventForAssignedFailTask (Bibin A Chundatt via jlowe) + MAPREDUCE-6518. Set SO_KEEPALIVE on shuffle connections (Chang Li via + jlowe) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 7a078a87f9f..d24b3a45b71 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -494,6 +494,7 @@ public class ShuffleHandler extends AuxiliaryService { } catch (Exception ex) { throw new RuntimeException(ex); } + bootstrap.setOption("child.keepAlive", true); bootstrap.setPipelineFactory(pipelineFact); port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT); Channel ch = bootstrap.bind(new InetSocketAddress(port)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java index f16fe5a43ac..25a622b2618 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java @@ -80,6 +80,8 @@ import org.apache.hadoop.yarn.server.records.Version; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.socket.SocketChannel; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.AbstractChannel; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; @@ -140,6 +142,27 @@ public class TestShuffleHandler { } } + private static class MockShuffleHandler2 extends org.apache.hadoop.mapred.ShuffleHandler { + boolean socketKeepAlive = false; + + @Override + protected Shuffle getShuffle(final Configuration conf) { + return new Shuffle(conf) { + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, + HttpRequest request, HttpResponse response, URL requestUri) + throws IOException { + SocketChannel channel = (SocketChannel)(ctx.getChannel()); + socketKeepAlive = channel.getConfig().isKeepAlive(); + } + }; + } + + protected boolean isSocketKeepAlive() { + return socketKeepAlive; + } + } + /** * Test the validation of ShuffleHandler's meta-data's serialization and * de-serialization. @@ -423,6 +446,42 @@ public class TestShuffleHandler { input.close(); } + @Test(timeout = 10000) + public void testSocketKeepAlive() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true); + // try setting to -ve keep alive timeout. + conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100); + HttpURLConnection conn = null; + MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2(); + try { + shuffleHandler.init(conf); + shuffleHandler.start(); + + String shuffleBaseURL = "http://127.0.0.1:" + + shuffleHandler.getConfig().get( + ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY); + URL url = + new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&" + + "map=attempt_12345_1_m_1_0"); + conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + conn.connect(); + conn.getInputStream(); + Assert.assertTrue("socket should be set KEEP_ALIVE", + shuffleHandler.isSocketKeepAlive()); + } finally { + if (conn != null) { + conn.disconnect(); + } + shuffleHandler.stop(); + } + } + /** * simulate a reducer that sends an invalid shuffle-header - sometimes a wrong * header_name and sometimes a wrong version