From e62a5f9d0af481eb4cba55d39253f64a1abdff1b Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 13 Nov 2014 15:42:25 +0000 Subject: [PATCH] MAPREDUCE-6156. Fetcher - connect() doesn't handle connection refused correctly. Contributed by Junping Du (cherry picked from commit 177e8090f5809beb3ebcb656cd0affbb3f487de8) --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../hadoop/mapreduce/task/reduce/Fetcher.java | 39 ++++++++++++++----- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 2caa82829f8..e81d24fdf09 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -246,6 +246,9 @@ Release 2.6.0 - 2014-11-15 MAPREDUCE-5958. Wrong reduce task progress if map output is compressed (Emilio Coppa and jlowe via kihwal) + MAPREDUCE-6156. Fetcher - connect() doesn't handle connection refused + correctly (Junping Du via jlowe) + Release 2.5.2 - 2014-11-10 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index 796394f20e8..3f408531b2d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -407,7 +407,7 @@ class Fetcher extends Thread { } if ((Time.monotonicNow() - startTime) >= this.fetchRetryTimeout) { LOG.warn("Failed to connect to host: " + url + "after " - + fetchRetryTimeout + "milliseconds."); + + fetchRetryTimeout + " milliseconds."); throw e; } try { @@ -596,7 +596,7 @@ class Fetcher extends Thread { } else { // timeout, prepare to be failed. LOG.warn("Timeout for copying MapOutput with retry on host " + host - + "after " + fetchRetryTimeout + "milliseconds."); + + "after " + fetchRetryTimeout + " milliseconds."); } } @@ -678,28 +678,49 @@ class Fetcher extends Thread { } else if (connectionTimeout > 0) { unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout); } + long startTime = Time.monotonicNow(); + long lastTime = startTime; + int attempts = 0; // set the connect timeout to the unit-connect-timeout connection.setConnectTimeout(unit); while (true) { try { + attempts++; connection.connect(); break; } catch (IOException ioe) { - // update the total remaining connect-timeout - connectionTimeout -= unit; - + long currentTime = Time.monotonicNow(); + long retryTime = currentTime - startTime; + long leftTime = connectionTimeout - retryTime; + long timeSinceLastIteration = currentTime - lastTime; // throw an exception if we have waited for timeout amount of time // note that the updated value if timeout is used here - if (connectionTimeout == 0) { + if (leftTime <= 0) { + int retryTimeInSeconds = (int) retryTime/1000; + LOG.error("Connection retry failed with " + attempts + + " attempts in " + retryTimeInSeconds + " seconds"); throw ioe; } - // reset the connect timeout for the last try - if (connectionTimeout < unit) { - unit = connectionTimeout; + if (leftTime < unit) { + unit = (int)leftTime; // reset the connect time out for the final connect connection.setConnectTimeout(unit); } + + if (timeSinceLastIteration < unit) { + try { + // sleep the left time of unit + sleep(unit - timeSinceLastIteration); + } catch (InterruptedException e) { + LOG.warn("Sleep in connection retry get interrupted."); + if (stopped) { + return; + } + } + } + // update the total remaining connect-timeout + lastTime = Time.monotonicNow(); } } }