From 2977cbfc83a6d9455b328dee90443b7c80f3b208 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Wed, 26 Jun 2013 05:50:23 +0000 Subject: [PATCH] Merge -c 1496741 from trunk to branch-2 to fix MAPREDUCE-5326. Added version to shuffle header. Contributed by Zhijie Shen. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1496742 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapreduce/task/reduce/Fetcher.java | 13 ++- .../mapreduce/task/reduce/ShuffleHeader.java | 6 ++ .../mapreduce/task/reduce/TestFetcher.java | 94 ++++++++++++++----- .../apache/hadoop/mapred/ShuffleHandler.java | 18 ++++ .../hadoop/mapred/TestShuffleHandler.java | 41 ++++++++ 6 files changed, 151 insertions(+), 24 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index dd5d11a9c61..3a4857a696d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -183,6 +183,9 @@ Release 2.1.0-beta - UNRELEASED MAPREDUCE-5194. Heed interrupts during Fetcher shutdown. (cdouglas) + MAPREDUCE-5326. Added version to shuffle header. (Zhijie Shen via + acmurthy) + OPTIMIZATIONS MAPREDUCE-4974. Optimising the LineRecordReader initialize() method diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index 4e9059fff25..4f1e5d14efa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -275,6 +275,11 @@ class Fetcher extends Thread { SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash); // set the read timeout connection.setReadTimeout(readTimeout); + // put shuffle version into http header + connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); connect(connection, connectionTimeout); // verify that the thread wasn't stopped during calls to connect if (stopped) { @@ -290,7 +295,13 @@ class Fetcher extends Thread { "Got invalid response code " + rc + " from " + url + ": " + connection.getResponseMessage()); } - + // get the shuffle version + if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals( + connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) + || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals( + connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) { + throw new IOException("Incompatible shuffle response version"); + } // get the replyHash which is HMac of the encHash we sent to the server String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH); if(replyHash==null) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java index 796608bbb6e..b42c018427d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java @@ -36,6 +36,12 @@ import org.apache.hadoop.io.WritableUtils; @InterfaceStability.Stable public class ShuffleHeader implements Writable { + /** Header info of the shuffle http request/response */ + public static final String HTTP_HEADER_NAME = "name"; + public static final String DEFAULT_HTTP_HEADER_NAME = "mapreduce"; + public static final String HTTP_HEADER_VERSION = "version"; + public static final String DEFAULT_HTTP_HEADER_VERSION = "1.0.0"; + /** * The longest possible length of task attempt id that we will accept. */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java index fdc9d98e539..59839408613 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java @@ -124,9 +124,8 @@ public class TestFetcher { underTest.copyFromHost(host); - verify(connection) - .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, - encHash); + verify(connection).addRequestProperty( + SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash); verify(allErrs).increment(1); verify(ss).copyFailed(map1ID, host, false, false); @@ -144,17 +143,20 @@ public class TestFetcher { String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); when(connection.getResponseCode()).thenReturn(200); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) - .thenReturn(replyHash); + .thenReturn(replyHash); ByteArrayInputStream in = new ByteArrayInputStream( "\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes()); when(connection.getInputStream()).thenReturn(in); underTest.copyFromHost(host); - verify(connection) - .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, - encHash); + verify(connection).addRequestProperty( + SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash); verify(allErrs).increment(1); verify(ss).copyFailed(map1ID, host, true, false); @@ -164,6 +166,37 @@ public class TestFetcher { verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); } + @Test + public void testCopyFromHostIncompatibleShuffleVersion() throws Exception { + String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); + + when(connection.getResponseCode()).thenReturn(200); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) + .thenReturn("mapreduce").thenReturn("other").thenReturn("other"); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) + .thenReturn("1.0.1").thenReturn("1.0.0").thenReturn("1.0.1"); + when(connection.getHeaderField( + SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn(replyHash); + ByteArrayInputStream in = new ByteArrayInputStream(new byte[0]); + when(connection.getInputStream()).thenReturn(in); + + for (int i = 0; i < 3; ++i) { + Fetcher underTest = new FakeFetcher(job, id, ss, mm, + r, metrics, except, key, connection); + underTest.copyFromHost(host); + } + + verify(connection, times(3)).addRequestProperty( + SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash); + + verify(allErrs, times(3)).increment(1); + verify(ss, times(3)).copyFailed(map1ID, host, false, false); + verify(ss, times(3)).copyFailed(map2ID, host, false, false); + + verify(ss, times(3)).putBackKnownMapOutput(any(MapHost.class), eq(map1ID)); + verify(ss, times(3)).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); + } + @Test public void testCopyFromHostWait() throws Exception { Fetcher underTest = new FakeFetcher(job, id, ss, mm, @@ -173,20 +206,24 @@ public class TestFetcher { when(connection.getResponseCode()).thenReturn(200); when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) - .thenReturn(replyHash); + .thenReturn(replyHash); ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); ByteArrayOutputStream bout = new ByteArrayOutputStream(); header.write(new DataOutputStream(bout)); ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); when(connection.getInputStream()).thenReturn(in); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); //Defaults to null, which is what we want to test when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) - .thenReturn(null); + .thenReturn(null); underTest.copyFromHost(host); verify(connection) - .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, + .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash); verify(allErrs, never()).increment(1); verify(ss, never()).copyFailed(map1ID, host, true, false); @@ -208,24 +245,27 @@ public class TestFetcher { when(connection.getResponseCode()).thenReturn(200); when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) - .thenReturn(replyHash); + .thenReturn(replyHash); ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); ByteArrayOutputStream bout = new ByteArrayOutputStream(); header.write(new DataOutputStream(bout)); ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); when(connection.getInputStream()).thenReturn(in); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) - .thenReturn(immo); + .thenReturn(immo); - doThrow(new java.lang.InternalError()) - .when(immo) - .shuffle(any(MapHost.class), any(InputStream.class), anyLong(), - anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class)); + doThrow(new java.lang.InternalError()).when(immo) + .shuffle(any(MapHost.class), any(InputStream.class), anyLong(), + anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class)); underTest.copyFromHost(host); verify(connection) - .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, + .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash); verify(ss, times(1)).copyFailed(map1ID, host, true, false); } @@ -236,19 +276,23 @@ public class TestFetcher { InMemoryMapOutput immo = spy(new InMemoryMapOutput( job, id, mm, 100, null, true)); when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) - .thenReturn(immo); + .thenReturn(immo); doNothing().when(mm).waitForResource(); when(ss.getHost()).thenReturn(host); String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); when(connection.getResponseCode()).thenReturn(200); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) - .thenReturn(replyHash); + .thenReturn(replyHash); ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); ByteArrayOutputStream bout = new ByteArrayOutputStream(); header.write(new DataOutputStream(bout)); final StuckInputStream in = - new StuckInputStream(new ByteArrayInputStream(bout.toByteArray())); + new StuckInputStream(new ByteArrayInputStream(bout.toByteArray())); when(connection.getInputStream()).thenReturn(in); doAnswer(new Answer() { public Void answer(InvocationOnMock ignore) throws IOException { @@ -280,20 +324,24 @@ public class TestFetcher { OnDiskMapOutput odmo = spy(new OnDiskMapOutput(map1ID, id, mm, 100L, job, mof, FETCHER, true, mFs, p)); when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) - .thenReturn(odmo); + .thenReturn(odmo); doNothing().when(mm).waitForResource(); when(ss.getHost()).thenReturn(host); String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); when(connection.getResponseCode()).thenReturn(200); when(connection.getHeaderField( - SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn(replyHash); + SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn(replyHash); ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); ByteArrayOutputStream bout = new ByteArrayOutputStream(); header.write(new DataOutputStream(bout)); final StuckInputStream in = - new StuckInputStream(new ByteArrayInputStream(bout.toByteArray())); + new StuckInputStream(new ByteArrayInputStream(bout.toByteArray())); when(connection.getInputStream()).thenReturn(in); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION)) + .thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); doAnswer(new Answer() { public Void answer(InvocationOnMock ignore) throws IOException { in.close(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 27b57dd262b..f9d1203ba4d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -23,6 +23,7 @@ import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; import static org.jboss.netty.handler.codec.http.HttpMethod.GET; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; +import static org.jboss.netty.handler.codec.http.HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; @@ -438,6 +439,13 @@ public class ShuffleHandler extends AuxiliaryService { sendError(ctx, METHOD_NOT_ALLOWED); return; } + // Check whether the shuffle version is compatible + if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals( + request.getHeader(ShuffleHeader.HTTP_HEADER_NAME)) + || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals( + request.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) { + sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST); + } final Map> q = new QueryStringDecoder(request.getUri()).getParameters(); final List mapIds = splitMaps(q.get("map")); @@ -543,6 +551,11 @@ public class ShuffleHandler extends AuxiliaryService { SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8), tokenSecret); response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply); + // Put shuffle version into http header + response.setHeader(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); if (LOG.isDebugEnabled()) { int len = reply.length(); LOG.debug("Fetcher request verfied. enc_str=" + enc_str + ";reply=" + @@ -627,6 +640,11 @@ public class ShuffleHandler extends AuxiliaryService { HttpResponseStatus status) { HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); + // Put shuffle version into http header + response.setHeader(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); response.setContent( ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java index 813781fa029..cf786b9fe21 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java @@ -180,6 +180,10 @@ public class TestShuffleHandler { + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0"); HttpURLConnection conn = (HttpURLConnection)url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); conn.connect(); DataInputStream input = new DataInputStream(conn.getInputStream()); Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); @@ -191,6 +195,35 @@ public class TestShuffleHandler { Assert.assertTrue("sendError called when client closed connection", failures.size() == 0); } + + @Test (timeout = 10000) + public void testIncompatibleShuffleVersion() throws Exception { + final int failureNum = 3; + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + ShuffleHandler shuffleHandler = new ShuffleHandler(); + shuffleHandler.init(conf); + shuffleHandler.start(); + + // simulate a reducer that closes early by reading a single shuffle header + // then closing the connection + URL url = new URL("http://127.0.0.1:" + + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0"); + for (int i = 0; i < failureNum; ++i) { + HttpURLConnection conn = (HttpURLConnection)url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + i == 0 ? "mapreduce" : "other"); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + i == 1 ? "1.0.0" : "1.0.1"); + conn.connect(); + Assert.assertEquals( + HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode()); + } + + shuffleHandler.stop(); + shuffleHandler.close(); + } @Test (timeout = 10000) public void testMaxConnections() throws Exception { @@ -242,6 +275,10 @@ public class TestShuffleHandler { + i + "_0"; URL url = new URL(URLstring); conns[i] = (HttpURLConnection)url.openConnection(); + conns[i].setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conns[i].setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); } // Try to open numerous connections @@ -330,6 +367,10 @@ public class TestShuffleHandler { + "/mapOutput?job=job_12345_0001&reduce=" + reducerId + "&map=attempt_12345_1_m_1_0"); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); conn.connect(); byte[] byteArr = new byte[10000]; try {