YARN-11510. [Federation] Fix NodeManager#TestFederationInterceptor Flaky Unit Test.

This commit is contained in:
slfan1989 2023-06-10 20:13:50 +08:00
parent 35158db711
commit 5d847ebe79
4 changed files with 39 additions and 2 deletions

View File

@ -3075,6 +3075,10 @@ public class YarnConfiguration extends Configuration {
+ "amrmproxy.enabled"; + "amrmproxy.enabled";
public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false; public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false;
public static final String AMRM_PROXY_WAIT_UAM_REGISTER_DONE =
NM_PREFIX + "amrmproxy.wait.uam-register.done";
public static final boolean DEFAULT_AMRM_PROXY_WAIT_UAM_REGISTER_DONE = false;
public static final String AMRM_PROXY_ADDRESS = NM_PREFIX public static final String AMRM_PROXY_ADDRESS = NM_PREFIX
+ "amrmproxy.address"; + "amrmproxy.address";
public static final int DEFAULT_AMRM_PROXY_PORT = 8049; public static final int DEFAULT_AMRM_PROXY_PORT = 8049;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server; package org.apache.hadoop.yarn.server;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.util.ArrayList; import java.util.ArrayList;
@ -183,7 +184,7 @@ import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
* change the implementation with care. * change the implementation with care.
*/ */
public class MockResourceManagerFacade implements ApplicationClientProtocol, public class MockResourceManagerFacade implements ApplicationClientProtocol,
ApplicationMasterProtocol, ResourceManagerAdministrationProtocol { ApplicationMasterProtocol, ResourceManagerAdministrationProtocol, Closeable {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(MockResourceManagerFacade.class); LoggerFactory.getLogger(MockResourceManagerFacade.class);
@ -967,4 +968,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public HashMap<ApplicationId, List<ContainerId>> getApplicationContainerIdMap() { public HashMap<ApplicationId, List<ContainerId>> getApplicationContainerIdMap() {
return applicationContainerIdMap; return applicationContainerIdMap;
} }
@Override
public void close() throws IOException {
isRunning = false;
}
} }

View File

@ -251,6 +251,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// the maximum wait time for the first async heart beat response // the maximum wait time for the first async heart beat response
private long heartbeatMaxWaitTimeMs; private long heartbeatMaxWaitTimeMs;
private boolean waitUamRegisterDone;
private MonotonicClock clock = new MonotonicClock(); private MonotonicClock clock = new MonotonicClock();
/** /**
@ -353,6 +355,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.subClusterTimeOut = this.subClusterTimeOut =
YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT; YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT;
} }
this.waitUamRegisterDone = conf.getBoolean(YarnConfiguration.AMRM_PROXY_WAIT_UAM_REGISTER_DONE,
YarnConfiguration.DEFAULT_AMRM_PROXY_WAIT_UAM_REGISTER_DONE);
} }
@Override @Override
@ -1332,6 +1336,18 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}); });
this.uamRegisterFutures.put(scId, future); this.uamRegisterFutures.put(scId, future);
} }
if (this.waitUamRegisterDone) {
for (Map.Entry<SubClusterId, Future<?>> entry : this.uamRegisterFutures.entrySet()) {
SubClusterId subClusterId = entry.getKey();
Future<?> future = entry.getValue();
while (!future.isDone()) {
LOG.info("subClusterId {} Wait Uam Register done.", subClusterId);
}
}
}
return newSubClusters; return newSubClusters;
} }

View File

@ -175,6 +175,9 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT, conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
500); 500);
// Wait UAM Register Down
conf.setBoolean(YarnConfiguration.AMRM_PROXY_WAIT_UAM_REGISTER_DONE, true);
return conf; return conf;
} }
@ -590,6 +593,10 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
interceptor.recover(recoveredDataMap); interceptor.recover(recoveredDataMap);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// Waiting for SC-1 to time out.
Thread.sleep(800);
// SC1 should be initialized to be timed out // SC1 should be initialized to be timed out
Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size()); Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
@ -848,7 +855,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
List<Container> containers = List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2); getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
for (Container c : containers) { for (Container c : containers) {
LOG.info("Allocated container " + c.getId()); LOG.info("Allocated container {}", c.getId());
} }
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
@ -882,6 +889,10 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
int numberOfContainers = 3; int numberOfContainers = 3;
// Should re-attach secondaries and get the three running containers // Should re-attach secondaries and get the three running containers
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// Waiting for SC-1 to time out.
Thread.sleep(800);
// SC1 should be initialized to be timed out // SC1 should be initialized to be timed out
Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size()); Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
Assert.assertEquals(numberOfContainers, Assert.assertEquals(numberOfContainers,