YARN-11357. Fix FederationClientInterceptor#submitApplication Can't Update SubClusterId (#5055)
This commit is contained in:
parent
562b693374
commit
ba77530ff4
@ -314,7 +314,7 @@ public GetNewApplicationResponse getNewApplication(
|
|||||||
// Try calling the getNewApplication method
|
// Try calling the getNewApplication method
|
||||||
List<SubClusterId> blacklist = new ArrayList<>();
|
List<SubClusterId> blacklist = new ArrayList<>();
|
||||||
int activeSubClustersCount = getActiveSubClustersCount();
|
int activeSubClustersCount = getActiveSubClustersCount();
|
||||||
int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries) + 1;
|
int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
GetNewApplicationResponse response =
|
GetNewApplicationResponse response =
|
||||||
@ -470,7 +470,7 @@ public SubmitApplicationResponse submitApplication(
|
|||||||
// but if the number of Active SubClusters is less than this number at this time,
|
// but if the number of Active SubClusters is less than this number at this time,
|
||||||
// we should provide a high number of retry according to the number of Active SubClusters.
|
// we should provide a high number of retry according to the number of Active SubClusters.
|
||||||
int activeSubClustersCount = getActiveSubClustersCount();
|
int activeSubClustersCount = getActiveSubClustersCount();
|
||||||
int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries) + 1;
|
int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
|
||||||
|
|
||||||
// Try calling the SubmitApplication method
|
// Try calling the SubmitApplication method
|
||||||
SubmitApplicationResponse response =
|
SubmitApplicationResponse response =
|
||||||
@ -484,7 +484,7 @@ public SubmitApplicationResponse submitApplication(
|
|||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (Exception e){
|
} catch (Exception e) {
|
||||||
routerMetrics.incrAppsFailedSubmitted();
|
routerMetrics.incrAppsFailedSubmitted();
|
||||||
RouterServerUtil.logAndThrowException(e.getMessage(), e);
|
RouterServerUtil.logAndThrowException(e.getMessage(), e);
|
||||||
}
|
}
|
||||||
@ -543,7 +543,7 @@ private SubmitApplicationResponse invokeSubmitApplication(
|
|||||||
ApplicationHomeSubCluster appHomeSubCluster =
|
ApplicationHomeSubCluster appHomeSubCluster =
|
||||||
ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
|
ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
|
||||||
|
|
||||||
if (exists || retryCount == 0) {
|
if (!exists || retryCount == 0) {
|
||||||
addApplicationHomeSubCluster(applicationId, appHomeSubCluster);
|
addApplicationHomeSubCluster(applicationId, appHomeSubCluster);
|
||||||
} else {
|
} else {
|
||||||
updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster);
|
updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster);
|
||||||
@ -563,8 +563,8 @@ private SubmitApplicationResponse invokeSubmitApplication(
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
|
RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
|
||||||
TARGET_CLIENT_RM_SERVICE, e.getMessage(), applicationId, subClusterId);
|
TARGET_CLIENT_RM_SERVICE, e.getMessage(), applicationId, subClusterId);
|
||||||
LOG.warn("Unable to submitApplication appId {} try #{} on SubCluster {} error = {}.",
|
LOG.warn("Unable to submitApplication appId {} try #{} on SubCluster {}.",
|
||||||
applicationId, subClusterId, e);
|
applicationId, retryCount, subClusterId, e);
|
||||||
if (subClusterId != null) {
|
if (subClusterId != null) {
|
||||||
blackList.add(subClusterId);
|
blackList.add(subClusterId);
|
||||||
}
|
}
|
||||||
@ -1948,4 +1948,9 @@ private void updateReservationHomeSubCluster(SubClusterId subClusterId,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setNumSubmitRetries(int numSubmitRetries) {
|
||||||
|
this.numSubmitRetries = numSubmitRetries;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,11 +18,14 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.router.clientrm;
|
package org.apache.hadoop.yarn.server.router.clientrm;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_POLICY_MANAGER;
|
||||||
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.test.LambdaTestUtils;
|
import org.apache.hadoop.test.LambdaTestUtils;
|
||||||
@ -48,7 +51,11 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
import org.junit.Assume;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -64,14 +71,22 @@
|
|||||||
* It tests the case with SubClusters down and the Router logic of retries. We
|
* It tests the case with SubClusters down and the Router logic of retries. We
|
||||||
* have 1 good SubCluster and 2 bad ones for all the tests.
|
* have 1 good SubCluster and 2 bad ones for all the tests.
|
||||||
*/
|
*/
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
public class TestFederationClientInterceptorRetry
|
public class TestFederationClientInterceptorRetry
|
||||||
extends BaseRouterClientRMTest {
|
extends BaseRouterClientRMTest {
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(TestFederationClientInterceptorRetry.class);
|
LoggerFactory.getLogger(TestFederationClientInterceptorRetry.class);
|
||||||
|
|
||||||
|
@Parameters
|
||||||
|
public static Collection<String[]> getParameters() {
|
||||||
|
return Arrays.asList(new String[][] {{UniformBroadcastPolicyManager.class.getName()},
|
||||||
|
{TestSequentialBroadcastPolicyManager.class.getName()}});
|
||||||
|
}
|
||||||
|
|
||||||
private TestableFederationClientInterceptor interceptor;
|
private TestableFederationClientInterceptor interceptor;
|
||||||
private MemoryFederationStateStore stateStore;
|
private MemoryFederationStateStore stateStore;
|
||||||
private FederationStateStoreTestUtil stateStoreUtil;
|
private FederationStateStoreTestUtil stateStoreUtil;
|
||||||
|
private String routerPolicyManagerName;
|
||||||
|
|
||||||
private String user = "test-user";
|
private String user = "test-user";
|
||||||
|
|
||||||
@ -84,6 +99,10 @@ public class TestFederationClientInterceptorRetry
|
|||||||
|
|
||||||
private static List<SubClusterId> scs = new ArrayList<>();
|
private static List<SubClusterId> scs = new ArrayList<>();
|
||||||
|
|
||||||
|
public TestFederationClientInterceptorRetry(String policyManagerName) {
|
||||||
|
this.routerPolicyManagerName = policyManagerName;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setUp() throws IOException {
|
public void setUp() throws IOException {
|
||||||
super.setUpConfig();
|
super.setUpConfig();
|
||||||
@ -150,8 +169,7 @@ protected YarnConfiguration createConfiguration() {
|
|||||||
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
|
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
|
||||||
+ "," + TestableFederationClientInterceptor.class.getName());
|
+ "," + TestableFederationClientInterceptor.class.getName());
|
||||||
|
|
||||||
conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
|
conf.set(FEDERATION_POLICY_MANAGER, this.routerPolicyManagerName);
|
||||||
UniformBroadcastPolicyManager.class.getName());
|
|
||||||
|
|
||||||
// Disable StateStoreFacade cache
|
// Disable StateStoreFacade cache
|
||||||
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
|
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
|
||||||
@ -283,4 +301,56 @@ public void testSubmitApplicationOneBadOneGood()
|
|||||||
SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster();
|
SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster();
|
||||||
Assert.assertEquals(good, respSubClusterId);
|
Assert.assertEquals(good, respSubClusterId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSubmitApplicationTwoBadOneGood() throws Exception {
|
||||||
|
|
||||||
|
LOG.info("Test submitApplication with two bad, one good SC.");
|
||||||
|
|
||||||
|
// This test must require the TestSequentialRouterPolicy policy
|
||||||
|
Assume.assumeThat(routerPolicyManagerName,
|
||||||
|
is(TestSequentialBroadcastPolicyManager.class.getName()));
|
||||||
|
|
||||||
|
setupCluster(Arrays.asList(bad1, bad2, good));
|
||||||
|
final ApplicationId appId =
|
||||||
|
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||||
|
|
||||||
|
// Use the TestSequentialRouterPolicy strategy,
|
||||||
|
// which will sort the SubClusterId because good=0, bad1=1, bad2=2
|
||||||
|
// We will get 2, 1, 0 [bad2, bad1, good]
|
||||||
|
// Set the retryNum to 1
|
||||||
|
// 1st time will use bad2, 2nd time will use bad1
|
||||||
|
// bad1 is updated to stateStore
|
||||||
|
interceptor.setNumSubmitRetries(1);
|
||||||
|
final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
|
||||||
|
LambdaTestUtils.intercept(YarnException.class, "RM is stopped",
|
||||||
|
() -> interceptor.submitApplication(request));
|
||||||
|
|
||||||
|
// We will get bad1
|
||||||
|
checkSubmitSubCluster(appId, bad1);
|
||||||
|
|
||||||
|
// Set the retryNum to 2
|
||||||
|
// 1st time will use bad2, 2nd time will use bad1, 3rd good
|
||||||
|
interceptor.setNumSubmitRetries(2);
|
||||||
|
SubmitApplicationResponse submitAppResponse = interceptor.submitApplication(request);
|
||||||
|
Assert.assertNotNull(submitAppResponse);
|
||||||
|
|
||||||
|
// We will get good
|
||||||
|
checkSubmitSubCluster(appId, good);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkSubmitSubCluster(ApplicationId appId, SubClusterId expectSubCluster)
|
||||||
|
throws YarnException {
|
||||||
|
GetApplicationHomeSubClusterRequest getAppRequest =
|
||||||
|
GetApplicationHomeSubClusterRequest.newInstance(appId);
|
||||||
|
GetApplicationHomeSubClusterResponse getAppResponse =
|
||||||
|
stateStore.getApplicationHomeSubCluster(getAppRequest);
|
||||||
|
Assert.assertNotNull(getAppResponse);
|
||||||
|
Assert.assertNotNull(getAppResponse);
|
||||||
|
ApplicationHomeSubCluster responseHomeSubCluster =
|
||||||
|
getAppResponse.getApplicationHomeSubCluster();
|
||||||
|
Assert.assertNotNull(responseHomeSubCluster);
|
||||||
|
SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster();
|
||||||
|
Assert.assertEquals(expectSubCluster, respSubClusterId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,39 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.router.clientrm;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.manager.AbstractPolicyManager;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This PolicyManager is used for testing and will contain the
|
||||||
|
* {@link TestSequentialRouterPolicy} policy.
|
||||||
|
*
|
||||||
|
* When we test FederationClientInterceptor Retry,
|
||||||
|
* we hope that SubCluster can return in a certain order, not randomly.
|
||||||
|
* We can view the policy description by linking to TestSequentialRouterPolicy.
|
||||||
|
*/
|
||||||
|
public class TestSequentialBroadcastPolicyManager extends AbstractPolicyManager {
|
||||||
|
public TestSequentialBroadcastPolicyManager() {
|
||||||
|
// this structurally hard-codes two compatible policies for Router and
|
||||||
|
// AMRMProxy.
|
||||||
|
routerFederationPolicy = TestSequentialRouterPolicy.class;
|
||||||
|
amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,78 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.router.clientrm;
|
||||||
|
|
||||||
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.policies.router.AbstractRouterPolicy;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||||
|
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a test strategy,
|
||||||
|
* the purpose of this strategy is to return subClusters in descending order of subClusterId.
|
||||||
|
*
|
||||||
|
* This strategy is to verify the situation of Retry during the use of FederationClientInterceptor.
|
||||||
|
* The conditions of use are as follows:
|
||||||
|
* 1.We require subClusterId to be an integer.
|
||||||
|
* 2.The larger the subCluster, the sooner the representative is selected.
|
||||||
|
*
|
||||||
|
* We have 4 subClusters, 2 normal subClusters, 2 bad subClusters.
|
||||||
|
* We expect to select badSubClusters first and then goodSubClusters during testing.
|
||||||
|
* We can set the subCluster like this, good1 = [0], good2 = [1], bad1 = [2], bad2 = [3].
|
||||||
|
* This strategy will return [3, 2, 1, 0],
|
||||||
|
* The selection order of subCluster is bad2, bad1, good2, good1.
|
||||||
|
*/
|
||||||
|
public class TestSequentialRouterPolicy extends AbstractRouterPolicy {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reinitialize(FederationPolicyInitializationContext policyContext)
|
||||||
|
throws FederationPolicyInitializationException {
|
||||||
|
FederationPolicyInitializationContextValidator.validate(policyContext,
|
||||||
|
this.getClass().getCanonicalName());
|
||||||
|
setPolicyContext(policyContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SubClusterId chooseSubCluster(String queue,
|
||||||
|
Map<SubClusterId, SubClusterInfo> preSelectSubClusters) throws YarnException {
|
||||||
|
/**
|
||||||
|
* This strategy is only suitable for testing. We need to obtain subClusters sequentially.
|
||||||
|
* We have 3 subClusters, 1 goodSubCluster and 2 badSubClusters.
|
||||||
|
* The sc-id of goodSubCluster is 0, and the sc-id of badSubCluster is 1 and 2.
|
||||||
|
* We hope Return in reverse order, that is, return 2, 1, 0
|
||||||
|
* Return to badCluster first.
|
||||||
|
*/
|
||||||
|
List<SubClusterId> subClusterIds = new ArrayList<>(preSelectSubClusters.keySet());
|
||||||
|
if (subClusterIds.size() > 1) {
|
||||||
|
subClusterIds.sort((o1, o2) -> Integer.parseInt(o2.getId()) - Integer.parseInt(o1.getId()));
|
||||||
|
}
|
||||||
|
if(CollectionUtils.isNotEmpty(subClusterIds)){
|
||||||
|
return subClusterIds.get(0);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user