MAPREDUCE-6156. Fetcher - connect() doesn't handle connection refused correctly. Contributed by Junping Du

(cherry picked from commit 177e8090f5)
This commit is contained in:
Jason Lowe 2014-11-13 15:42:25 +00:00
parent 9c94945fe9
commit e62a5f9d0a
2 changed files with 33 additions and 9 deletions

View File

@ -246,6 +246,9 @@ Release 2.6.0 - 2014-11-15
MAPREDUCE-5958. Wrong reduce task progress if map output is compressed MAPREDUCE-5958. Wrong reduce task progress if map output is compressed
(Emilio Coppa and jlowe via kihwal) (Emilio Coppa and jlowe via kihwal)
MAPREDUCE-6156. Fetcher - connect() doesn't handle connection refused
correctly (Junping Du via jlowe)
Release 2.5.2 - 2014-11-10 Release 2.5.2 - 2014-11-10
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -678,28 +678,49 @@ class Fetcher<K,V> extends Thread {
} else if (connectionTimeout > 0) { } else if (connectionTimeout > 0) {
unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout); unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
} }
long startTime = Time.monotonicNow();
long lastTime = startTime;
int attempts = 0;
// set the connect timeout to the unit-connect-timeout // set the connect timeout to the unit-connect-timeout
connection.setConnectTimeout(unit); connection.setConnectTimeout(unit);
while (true) { while (true) {
try { try {
attempts++;
connection.connect(); connection.connect();
break; break;
} catch (IOException ioe) { } catch (IOException ioe) {
// update the total remaining connect-timeout long currentTime = Time.monotonicNow();
connectionTimeout -= unit; long retryTime = currentTime - startTime;
long leftTime = connectionTimeout - retryTime;
long timeSinceLastIteration = currentTime - lastTime;
// throw an exception if we have waited for timeout amount of time // throw an exception if we have waited for timeout amount of time
// note that the updated value if timeout is used here // note that the updated value if timeout is used here
if (connectionTimeout == 0) { if (leftTime <= 0) {
int retryTimeInSeconds = (int) retryTime/1000;
LOG.error("Connection retry failed with " + attempts +
" attempts in " + retryTimeInSeconds + " seconds");
throw ioe; throw ioe;
} }
// reset the connect timeout for the last try // reset the connect timeout for the last try
if (connectionTimeout < unit) { if (leftTime < unit) {
unit = connectionTimeout; unit = (int)leftTime;
// reset the connect time out for the final connect // reset the connect time out for the final connect
connection.setConnectTimeout(unit); connection.setConnectTimeout(unit);
} }
if (timeSinceLastIteration < unit) {
try {
// sleep the left time of unit
sleep(unit - timeSinceLastIteration);
} catch (InterruptedException e) {
LOG.warn("Sleep in connection retry get interrupted.");
if (stopped) {
return;
}
}
}
// update the total remaining connect-timeout
lastTime = Time.monotonicNow();
} }
} }
} }