YARN-8933. [AMRMProxy] Fix potential empty fields in allocation response, move SubClusterTimeout to FederationInterceptor. Contributed by Botong Huang.

This commit is contained in:
Botong Huang 2018-11-11 11:12:53 -08:00
parent 2664248797
commit b5ec85d966
12 changed files with 236 additions and 100 deletions

View File

@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -49,7 +50,8 @@ public void reinitialize(
@Override
public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
List<ResourceRequest> resourceRequests) throws YarnException {
List<ResourceRequest> resourceRequests,
Set<SubClusterId> timedOutSubClusters) throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();

View File

@ -19,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -39,15 +40,16 @@ public interface FederationAMRMProxyPolicy
*
* @param resourceRequests the list of {@link ResourceRequest}s from the AM to
* be split
*
* @param timedOutSubClusters the set of sub-clusters that haven't had a
* successful heart-beat response for a while.
* @return map of sub-cluster as identified by {@link SubClusterId} to the
* list of {@link ResourceRequest}s that should be forwarded to it
*
* @throws YarnException in case the request is malformed or no viable
* sub-clusters can be found.
*/
Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
List<ResourceRequest> resourceRequests) throws YarnException;
List<ResourceRequest> resourceRequests,
Set<SubClusterId> timedOutSubClusters) throws YarnException;
/**
* This method should be invoked to notify the policy about responses being

View File

@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -55,8 +56,8 @@ public void reinitialize(
@Override
public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
List<ResourceRequest> resourceRequests) throws YarnException {
List<ResourceRequest> resourceRequests,
Set<SubClusterId> timedOutSubClusters) throws YarnException {
if (homeSubcluster == null) {
throw new FederationPolicyException("No home subcluster available");
}

View File

@ -34,7 +34,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
@ -132,11 +131,8 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
private SubClusterResolver resolver;
private Map<SubClusterId, Resource> headroom;
private Map<SubClusterId, Long> lastHeartbeatTimeStamp;
private long subClusterTimeOut;
private float hrAlpha;
private FederationStateStoreFacade federationFacade;
private AllocationBookkeeper bookkeeper;
private SubClusterId homeSubcluster;
@Override
@ -186,26 +182,12 @@ public void reinitialize(
if (headroom == null) {
headroom = new ConcurrentHashMap<>();
lastHeartbeatTimeStamp = new ConcurrentHashMap<>();
}
hrAlpha = policy.getHeadroomAlpha();
this.federationFacade =
policyContext.getFederationStateStoreFacade();
this.homeSubcluster = policyContext.getHomeSubcluster();
this.subClusterTimeOut = this.federationFacade.getConf().getLong(
YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);
if (this.subClusterTimeOut <= 0) {
LOG.info(
"{} configured to be {}, should be positive. Using default of {}.",
YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
this.subClusterTimeOut,
YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);
this.subClusterTimeOut =
YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT;
}
}
@Override
@ -216,18 +198,18 @@ public void notifyOfResponse(SubClusterId subClusterId,
LOG.info("Subcluster {} updated with {} memory headroom", subClusterId,
response.getAvailableResources().getMemorySize());
}
lastHeartbeatTimeStamp.put(subClusterId, System.currentTimeMillis());
}
@Override
public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
List<ResourceRequest> resourceRequests) throws YarnException {
List<ResourceRequest> resourceRequests,
Set<SubClusterId> timedOutSubClusters) throws YarnException {
// object used to accumulate statistics about the answer, initialize with
// active subclusters. Create a new instance per call because this method
// can be called concurrently.
bookkeeper = new AllocationBookkeeper();
bookkeeper.reinitialize(federationFacade.getSubClusters(true));
AllocationBookkeeper bookkeeper = new AllocationBookkeeper();
bookkeeper.reinitialize(getActiveSubclusters(), timedOutSubClusters);
List<ResourceRequest> nonLocalizedRequests =
new ArrayList<ResourceRequest>();
@ -298,15 +280,6 @@ public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
// handle all non-localized requests (ANY)
splitAnyRequests(nonLocalizedRequests, bookkeeper);
for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : bookkeeper
.getAnswer().entrySet()) {
// A new-cluster here will trigger new UAM luanch, which might take a long
// time. We don't want too many requests stuck in this UAM before it is
// ready and starts heartbeating
if (!lastHeartbeatTimeStamp.containsKey(entry.getKey())) {
lastHeartbeatTimeStamp.put(entry.getKey(), System.currentTimeMillis());
}
}
return bookkeeper.getAnswer();
}
@ -540,8 +513,8 @@ protected final class AllocationBookkeeper {
private float totPolicyWeight = 0;
private void reinitialize(
Map<SubClusterId, SubClusterInfo> activeSubclusters)
throws YarnException {
Map<SubClusterId, SubClusterInfo> activeSubclusters,
Set<SubClusterId> timedOutSubClusters) throws YarnException {
if (activeSubclusters == null) {
throw new YarnRuntimeException("null activeSubclusters received");
}
@ -573,17 +546,8 @@ private void reinitialize(
}
Set<SubClusterId> tmpSCSet = new HashSet<>(activeAndEnabledSC);
for (Map.Entry<SubClusterId, Long> entry : lastHeartbeatTimeStamp
.entrySet()) {
long duration = System.currentTimeMillis() - entry.getValue();
if (duration > subClusterTimeOut) {
LOG.warn(
"Subcluster {} does not have a success heartbeat for {}s, "
+ "skip routing asks there for this request",
entry.getKey(), (double) duration / 1000);
tmpSCSet.remove(entry.getKey());
}
}
tmpSCSet.removeAll(timedOutSubClusters);
if (tmpSCSet.size() < 1) {
LOG.warn("All active and enabled subclusters have expired last "
+ "heartbeat time. Ignore the expiry check for this request");

View File

@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -47,7 +48,8 @@ public void reinitialize(FederationPolicyInitializationContext policyContext)
@Override
public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
List<ResourceRequest> resourceRequests) throws YarnException {
List<ResourceRequest> resourceRequests,
Set<SubClusterId> timedOutSubClusters) throws YarnException {
throw new FederationPolicyException("The policy configured for this queue "
+ "rejects all routing requests by construction.");
}

View File

@ -23,6 +23,7 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -109,8 +110,8 @@ public void testNoSubclusters() throws YarnException {
String[] hosts = new String[] {"host1", "host2"};
List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
.createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
((FederationAMRMProxyPolicy) localPolicy)
.splitResourceRequests(resourceRequests);
((FederationAMRMProxyPolicy) localPolicy).splitResourceRequests(
resourceRequests, new HashSet<SubClusterId>());
}
}

View File

@ -21,6 +21,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -28,7 +29,6 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@ -71,8 +71,8 @@ public void testSplitAllocateRequest() throws Exception {
.createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
resourceRequests, new HashSet<SubClusterId>());
Assert.assertTrue(response.size() == 2);
for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : response
.entrySet()) {
@ -94,8 +94,8 @@ public void testNotifyOfResponseFromUnknownSubCluster() throws Exception {
List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
.createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
resourceRequests, new HashSet<SubClusterId>());
((FederationAMRMProxyPolicy) getPolicy()).notifyOfResponse(
SubClusterId.newInstance("sc3"), mock(AllocateResponse.class));

View File

@ -26,6 +26,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -80,9 +81,9 @@ public void testSplitAllocateRequest() throws YarnException {
hosts, 2 * 1024, 2, 1, 3, null, false);
HomeAMRMProxyPolicy federationPolicy =
(HomeAMRMProxyPolicy)getPolicy();
Map<SubClusterId, List<ResourceRequest>> response =
federationPolicy.splitResourceRequests(resourceRequests);
(HomeAMRMProxyPolicy) getPolicy();
Map<SubClusterId, List<ResourceRequest>> response = federationPolicy
.splitResourceRequests(resourceRequests, new HashSet<SubClusterId>());
assertEquals(1, response.size());
assertNotNull(response.get(HOME_SC_ID));
assertEquals(9, response.get(HOME_SC_ID).size());
@ -101,7 +102,8 @@ public void testHomeSubclusterNotActive() throws YarnException {
List<ResourceRequest> resourceRequests = createResourceRequests(
hosts, 2 * 1024, 2, 1, 3, null, false);
HomeAMRMProxyPolicy federationPolicy = (HomeAMRMProxyPolicy)getPolicy();
federationPolicy.splitResourceRequests(resourceRequests);
federationPolicy.splitResourceRequests(resourceRequests,
new HashSet<SubClusterId>());
fail("It should fail when the home subcluster is not active");
} catch(FederationPolicyException e) {
GenericTestUtils.assertExceptionContains("is not active", e);

View File

@ -154,8 +154,8 @@ public void testSplitBasedOnHeadroom() throws Exception {
prepPolicyWithHeadroom(true);
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
resourceRequests, new HashSet<SubClusterId>());
// pretty print requests
LOG.info("Initial headroom");
@ -180,7 +180,7 @@ public void testSplitBasedOnHeadroom() throws Exception {
((FederationAMRMProxyPolicy) getPolicy())
.notifyOfResponse(SubClusterId.newInstance("subcluster2"), ar);
response = ((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
.splitResourceRequests(resourceRequests, new HashSet<SubClusterId>());
LOG.info("After headroom update");
prettyPrintRequests(response);
@ -218,8 +218,8 @@ public void testStressPolicy() throws Exception {
long tstart = System.currentTimeMillis();
for (int i = 0; i < numIterations; i++) {
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
resourceRequests, new HashSet<SubClusterId>());
validateSplit(response, resourceRequests);
}
long tend = System.currentTimeMillis();
@ -243,8 +243,8 @@ public void testFWDAllZeroANY() throws Exception {
prepPolicyWithHeadroom(true);
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
resourceRequests, new HashSet<SubClusterId>());
// we expect all three to appear for a zero-sized ANY
@ -279,8 +279,8 @@ public void testSplitBasedOnHeadroomAndWeights() throws Exception {
prepPolicyWithHeadroom(true);
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
resourceRequests, new HashSet<SubClusterId>());
// pretty print requests
prettyPrintRequests(response);
@ -354,8 +354,8 @@ public void testSplitAllocateRequest() throws Exception {
List<ResourceRequest> resourceRequests = createComplexRequest();
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
resourceRequests, new HashSet<SubClusterId>());
validateSplit(response, resourceRequests);
prettyPrintRequests(response);
@ -697,8 +697,8 @@ public void testCancelWithLocalizedResource() throws YarnException {
ResourceRequest.ANY, 1024, 1, 1, 0, null, false));
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
resourceRequests, new HashSet<SubClusterId>());
checkExpectedAllocation(response, "subcluster0", 3, 1);
checkExpectedAllocation(response, "subcluster1", 1, 0);
@ -717,7 +717,7 @@ public void testCancelWithLocalizedResource() throws YarnException {
ResourceRequest.ANY, 1024, 1, 1, 100, null, false));
response = ((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
.splitResourceRequests(resourceRequests, new HashSet<SubClusterId>());
/*
* Since node request is a cancel, it should not be considered associated
@ -750,12 +750,13 @@ public void testSubClusterExpiry() throws Exception {
initializePolicy(conf);
List<ResourceRequest> resourceRequests = createSimpleRequest();
// Update the response timestamp for the first time
prepPolicyWithHeadroom(true);
// For first time, no sub-cluster expired
Set<SubClusterId> expiredSCList = new HashSet<>();
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
.splitResourceRequests(resourceRequests, expiredSCList);
// pretty print requests
prettyPrintRequests(response);
@ -776,11 +777,11 @@ public void testSubClusterExpiry() throws Exception {
Thread.sleep(800);
// Update the response timestamp for the second time, skipping sc0 and sc5
prepPolicyWithHeadroom(false);
// For the second time, sc0 and sc5 expired
expiredSCList.add(SubClusterId.newInstance("subcluster0"));
expiredSCList.add(SubClusterId.newInstance("subcluster5"));
response = ((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
.splitResourceRequests(resourceRequests, expiredSCList);
// pretty print requests
prettyPrintRequests(response);

View File

@ -21,6 +21,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -69,8 +70,8 @@ public void testSplitAllocateRequest() throws Exception {
.createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
Map<SubClusterId, List<ResourceRequest>> response =
((FederationAMRMProxyPolicy) getPolicy())
.splitResourceRequests(resourceRequests);
((FederationAMRMProxyPolicy) getPolicy()).splitResourceRequests(
resourceRequests, new HashSet<SubClusterId>());
}

View File

@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -60,6 +61,7 @@
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.PreemptionContract;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
@ -163,10 +165,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
/**
* Stores the AllocateResponses that are received asynchronously from all the
* sub-cluster resource managers, including home RM.
* sub-cluster resource managers, including home RM, but not merged and
* returned back to AM yet.
*/
private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
/**
* Remembers the last allocate response from all known sub-clusters. This is
* used together with sub-cluster timeout to assemble entries about
* cluster-wide info (e.g. AvailableResource, NumClusterNodes) in the allocate
* response back to AM.
*/
private Map<SubClusterId, AllocateResponse> lastSCResponse;
/**
* The async UAM registration result that is not consumed yet.
*/
private Map<SubClusterId, RegisterApplicationMasterResponse> uamRegistrations;
// For unit test synchronization
@ -216,6 +230,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
private SubClusterResolver subClusterResolver;
/**
* Records the last time a successful heartbeat response received from a known
* sub-cluster. lastHeartbeatTimeStamp.keySet() should be in sync with
* uamPool.getAllUAMIds().
*/
private Map<SubClusterId, Long> lastSCResponseTime;
private long subClusterTimeOut;
private long lastAMHeartbeatTime;
/** The policy used to split requests among sub-clusters. */
private FederationAMRMProxyPolicy policyInterpreter;
@ -232,6 +256,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
public FederationInterceptor() {
this.containerIdToSubClusterIdMap = new ConcurrentHashMap<>();
this.asyncResponseSink = new ConcurrentHashMap<>();
this.lastSCResponse = new ConcurrentHashMap<>();
this.uamRegistrations = new ConcurrentHashMap<>();
this.uamRegisterFutures = new ConcurrentHashMap<>();
this.threadpool = Executors.newCachedThreadPool();
@ -241,6 +266,8 @@ public FederationInterceptor() {
this.amRegistrationResponse = null;
this.justRecovered = false;
this.finishAMCalled = false;
this.lastSCResponseTime = new ConcurrentHashMap<>();
this.lastAMHeartbeatTime = this.clock.getTime();
}
/**
@ -310,6 +337,19 @@ public void init(AMRMProxyApplicationContext appContext) {
this.heartbeatMaxWaitTimeMs =
conf.getLong(YarnConfiguration.FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS,
YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_HB_MAX_WAIT_MS);
this.subClusterTimeOut =
conf.getLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);
if (this.subClusterTimeOut <= 0) {
LOG.info(
"{} configured to be {}, should be positive. Using default of {}.",
YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
this.subClusterTimeOut,
YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT);
this.subClusterTimeOut =
YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT;
}
}
@Override
@ -394,6 +434,10 @@ public void recover(Map<String, byte[]> recoveredDataMap) {
this.uamPool.registerApplicationMaster(subClusterId.getId(),
this.amRegistrationRequest);
// Set sub-cluster to be timed out initially
lastSCResponseTime.put(subClusterId,
clock.getTime() - subClusterTimeOut);
// Running containers from secondary RMs
for (Container container : response
.getContainersFromPreviousAttempts()) {
@ -580,6 +624,7 @@ public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
Preconditions.checkArgument(this.policyInterpreter != null,
"Allocate should be called after registerApplicationMaster");
this.lastAMHeartbeatTime = this.clock.getTime();
if (this.justRecovered) {
throw new ApplicationMasterNotRegisteredException(
@ -644,8 +689,7 @@ public AllocateResponse allocate(AllocateRequest request)
}
// Prepare the response to AM
AllocateResponse response =
RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
AllocateResponse response = generateBaseAllocationResponse();
// Merge all responses from response sink
mergeAllocateResponses(response);
@ -970,6 +1014,10 @@ public RegisterApplicationMasterResponse call() throws Exception {
response = uamPool.registerApplicationMaster(
subClusterId.getId(), amRegistrationRequest);
// Set sub-cluster to be timed out initially
lastSCResponseTime.put(subClusterId,
clock.getTime() - subClusterTimeOut);
if (response != null
&& response.getContainersFromPreviousAttempts() != null) {
cacheAllocatedContainers(
@ -1172,6 +1220,10 @@ private List<SubClusterId> registerAndAllocateWithNewSubClusters(
if (!subClusterId.equals(this.homeSubClusterId)
&& !this.uamPool.hasUAMId(subClusterId.getId())) {
newSubClusters.add(subClusterId);
// Set sub-cluster to be timed out initially
lastSCResponseTime.put(subClusterId,
clock.getTime() - subClusterTimeOut);
}
}
@ -1244,6 +1296,38 @@ public void run() {
return newSubClusters;
}
/**
* Prepare the base allocation response. Use lastSCResponse and
* lastHeartbeatTimeStamp to assemble entries about cluster-wide info, e.g.
* AvailableResource, NumClusterNodes.
*/
protected AllocateResponse generateBaseAllocationResponse() {
AllocateResponse baseResponse =
RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
baseResponse.setAvailableResources(Resource.newInstance(0, 0));
baseResponse.setNumClusterNodes(0);
Set<SubClusterId> expiredSC = getTimedOutSCs(false);
for (Entry<SubClusterId, AllocateResponse> entry : lastSCResponse
.entrySet()) {
if (expiredSC.contains(entry.getKey())) {
// Skip expired sub-clusters
continue;
}
AllocateResponse response = entry.getValue();
if (response.getAvailableResources() != null) {
baseResponse.setAvailableResources(
Resources.add(baseResponse.getAvailableResources(),
response.getAvailableResources()));
}
baseResponse.setNumClusterNodes(
baseResponse.getNumClusterNodes() + response.getNumClusterNodes());
}
return baseResponse;
}
/**
* Merge the responses from all sub-clusters that we received asynchronously
* and keeps track of the containers received from each sub-cluster resource
@ -1345,17 +1429,6 @@ private void mergeAllocateResponse(AllocateResponse homeResponse,
}
}
if (otherResponse.getAvailableResources() != null) {
if (homeResponse.getAvailableResources() != null) {
homeResponse.setAvailableResources(
Resources.add(homeResponse.getAvailableResources(),
otherResponse.getAvailableResources()));
} else {
homeResponse
.setAvailableResources(otherResponse.getAvailableResources());
}
}
if (!isNullOrEmpty(otherResponse.getCompletedContainersStatuses())) {
if (!isNullOrEmpty(homeResponse.getCompletedContainersStatuses())) {
homeResponse.getCompletedContainersStatuses()
@ -1520,6 +1593,29 @@ private static AllocateRequest createAllocateRequest() {
return request;
}
protected Set<SubClusterId> getTimedOutSCs(boolean verbose) {
Set<SubClusterId> timedOutSCs = new HashSet<>();
for (Map.Entry<SubClusterId, Long> entry : this.lastSCResponseTime
.entrySet()) {
if (entry.getValue() > this.lastAMHeartbeatTime) {
// AM haven't heartbeat to us (and thus we to all SCs) for a long time,
// should not consider the SC as timed out
continue;
}
long duration = this.clock.getTime() - entry.getValue();
if (duration > this.subClusterTimeOut) {
if (verbose) {
LOG.warn(
"Subcluster {} doesn't have a successful heartbeat"
+ " for {} seconds for {}",
entry.getKey(), (double) duration / 1000, this.attemptId);
}
timedOutSCs.add(entry.getKey());
}
}
return timedOutSCs;
}
/**
* Check to see if the specified containerId exists in the cache and log an
* error if not found.
@ -1553,7 +1649,8 @@ private boolean warnIfNotExists(ContainerId containerId, String actionName) {
*/
protected Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
List<ResourceRequest> askList) throws YarnException {
return this.policyInterpreter.splitResourceRequests(askList);
return policyInterpreter.splitResourceRequests(askList,
getTimedOutSCs(true));
}
@VisibleForTesting
@ -1602,6 +1699,8 @@ public void callback(AllocateResponse response) {
// Notify main thread about the response arrival
asyncResponseSink.notifyAll();
}
lastSCResponse.put(subClusterId, response);
lastSCResponseTime.put(subClusterId, clock.getTime());
// Notify policy of allocate response
try {

View File

@ -160,6 +160,10 @@ protected YarnConfiguration createConfiguration() {
// Disable StateStoreFacade cache
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
// Set sub-cluster timeout to 500ms
conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
500);
return conf;
}
@ -568,6 +572,8 @@ public Object run() throws Exception {
interceptor.recover(recoveredDataMap);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// SC1 should be initialized to be timed out
Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
// The first allocate call expects a fail-over exception and re-register
try {
@ -740,6 +746,60 @@ public void testAllocateResponse() throws Exception {
Assert.assertEquals(1, response.getUpdateErrors().size());
}
@Test
public void testSubClusterTimeOut() throws Exception {
UserGroupInformation ugi =
interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// Register the application first time
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
registerSubCluster(SubClusterId.newInstance("SC-1"));
getContainersAndAssert(1, 1);
AllocateResponse allocateResponse =
interceptor.generateBaseAllocationResponse();
Assert.assertEquals(2, allocateResponse.getNumClusterNodes());
Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size());
// Let all SC timeout (home and SC-1), without an allocate from AM
Thread.sleep(800);
// Should not be considered timeout, because there's no recent AM
// heartbeat
allocateResponse = interceptor.generateBaseAllocationResponse();
Assert.assertEquals(2, allocateResponse.getNumClusterNodes());
Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size());
// Generate a duplicate heartbeat from AM, so that it won't really
// trigger an heartbeat to all SC
AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
// Set to lastResponseId - 1 so that it will be considered a duplicate
// heartbeat and thus not forwarded to all SCs
allocateRequest.setResponseId(lastResponseId - 1);
interceptor.allocate(allocateRequest);
// Should be considered timeout
allocateResponse = interceptor.generateBaseAllocationResponse();
Assert.assertEquals(0, allocateResponse.getNumClusterNodes());
Assert.assertEquals(2, interceptor.getTimedOutSCs(true).size());
return null;
}
});
}
@Test
public void testSecondAttempt() throws Exception {
final RegisterApplicationMasterRequest registerReq =
@ -803,6 +863,8 @@ public Object run() throws Exception {
int numberOfContainers = 3;
// Should re-attach secondaries and get the three running containers
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// SC1 should be initialized to be timed out
Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
Assert.assertEquals(numberOfContainers,
registerResponse.getContainersFromPreviousAttempts().size());
@ -831,5 +893,4 @@ public Object run() throws Exception {
}
});
}
}