YARN-10479. RMProxy should retry on SocketTimeout Exceptions. Contributed by Jim Brennan (Jim_Brennan)
(cherry picked from commit 55339c2bdd
)
This commit is contained in:
parent
4cec4bebd9
commit
c7b1bdba63
|
@ -24,6 +24,7 @@ import java.net.ConnectException;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.NoRouteToHostException;
|
||||
import java.net.SocketException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.HashMap;
|
||||
|
@ -275,6 +276,7 @@ public class RMProxy<T> {
|
|||
exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
|
||||
exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
|
||||
exceptionToPolicyMap.put(SocketException.class, retryPolicy);
|
||||
exceptionToPolicyMap.put(SocketTimeoutException.class, retryPolicy);
|
||||
exceptionToPolicyMap.put(StandbyException.class, retryPolicy);
|
||||
// YARN-4288: local IOException is also possible.
|
||||
exceptionToPolicyMap.put(IOException.class, retryPolicy);
|
||||
|
|
|
@ -371,12 +371,15 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
|
|||
private final long rmStartIntervalMS;
|
||||
private final boolean rmNeverStart;
|
||||
public ResourceTracker resourceTracker;
|
||||
private final boolean useSocketTimeoutEx;
|
||||
public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher,
|
||||
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
|
||||
long rmStartIntervalMS, boolean rmNeverStart) {
|
||||
long rmStartIntervalMS, boolean rmNeverStart,
|
||||
boolean useSocketTimeoutEx) {
|
||||
super(context, dispatcher, healthChecker, metrics);
|
||||
this.rmStartIntervalMS = rmStartIntervalMS;
|
||||
this.rmNeverStart = rmNeverStart;
|
||||
this.useSocketTimeoutEx = useSocketTimeoutEx;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -391,7 +394,8 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
|
|||
HAUtil.isHAEnabled(conf));
|
||||
resourceTracker =
|
||||
(ResourceTracker) RetryProxy.create(ResourceTracker.class,
|
||||
new MyResourceTracker6(rmStartIntervalMS, rmNeverStart),
|
||||
new MyResourceTracker6(rmStartIntervalMS, rmNeverStart,
|
||||
useSocketTimeoutEx),
|
||||
retryPolicy);
|
||||
return resourceTracker;
|
||||
}
|
||||
|
@ -824,11 +828,14 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
|
|||
private long rmStartIntervalMS;
|
||||
private boolean rmNeverStart;
|
||||
private final long waitStartTime;
|
||||
private final boolean useSocketTimeoutEx;
|
||||
|
||||
public MyResourceTracker6(long rmStartIntervalMS, boolean rmNeverStart) {
|
||||
MyResourceTracker6(long rmStartIntervalMS, boolean rmNeverStart,
|
||||
boolean useSocketTimeoutEx) {
|
||||
this.rmStartIntervalMS = rmStartIntervalMS;
|
||||
this.rmNeverStart = rmNeverStart;
|
||||
this.waitStartTime = System.currentTimeMillis();
|
||||
this.useSocketTimeoutEx = useSocketTimeoutEx;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -837,8 +844,13 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
|
|||
IOException {
|
||||
if (System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS
|
||||
|| rmNeverStart) {
|
||||
throw new java.net.ConnectException("Faking RM start failure as start "
|
||||
+ "delay timer has not expired.");
|
||||
if (useSocketTimeoutEx) {
|
||||
throw new java.net.SocketTimeoutException(
|
||||
"Faking RM start failure as start delay timer has not expired.");
|
||||
} else {
|
||||
throw new java.net.ConnectException(
|
||||
"Faking RM start failure as start delay timer has not expired.");
|
||||
}
|
||||
} else {
|
||||
NodeId nodeId = request.getNodeId();
|
||||
Resource resource = request.getResource();
|
||||
|
@ -1340,8 +1352,8 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test (timeout = 150000)
|
||||
public void testNMConnectionToRM() throws Exception {
|
||||
private void testNMConnectionToRMInternal(boolean useSocketTimeoutEx)
|
||||
throws Exception {
|
||||
final long delta = 50000;
|
||||
final long connectionWaitMs = 5000;
|
||||
final long connectionRetryIntervalMs = 1000;
|
||||
|
@ -1360,7 +1372,7 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
|
|||
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
||||
NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4(
|
||||
context, dispatcher, healthChecker, metrics,
|
||||
rmStartIntervalMS, true);
|
||||
rmStartIntervalMS, true, useSocketTimeoutEx);
|
||||
return nodeStatusUpdater;
|
||||
}
|
||||
};
|
||||
|
@ -1392,7 +1404,7 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
|
|||
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
||||
NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4(
|
||||
context, dispatcher, healthChecker, metrics, rmStartIntervalMS,
|
||||
false);
|
||||
false, useSocketTimeoutEx);
|
||||
return nodeStatusUpdater;
|
||||
}
|
||||
};
|
||||
|
@ -1423,6 +1435,16 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
|
|||
(duration < (rmStartIntervalMS + delta)));
|
||||
}
|
||||
|
||||
@Test (timeout = 150000)
|
||||
public void testNMConnectionToRM() throws Exception {
|
||||
testNMConnectionToRMInternal(false);
|
||||
}
|
||||
|
||||
@Test (timeout = 150000)
|
||||
public void testNMConnectionToRMwithSocketTimeout() throws Exception {
|
||||
testNMConnectionToRMInternal(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that if for some reason NM fails to start ContainerManager RPC
|
||||
* server, RM is oblivious to NM's presence. The behaviour is like this
|
||||
|
|
Loading…
Reference in New Issue