From 52daa6d971ae408d121b3737ea8c0575e7e8516d Mon Sep 17 00:00:00 2001 From: Subru Krishnan Date: Mon, 26 Jun 2017 13:27:26 -0700 Subject: [PATCH] YARN-3659. Federation: routing client invocations transparently to multiple RMs. (Giovanni Matteo Fumarola via Subru). --- .../hadoop/yarn/conf/YarnConfiguration.java | 8 + .../conf/TestYarnConfigurationFields.java | 5 + .../yarn/conf/TestYarnConfiguration.java | 1 - ...nPolicyInitializationContextValidator.java | 2 +- .../policies/FederationPolicyUtils.java | 35 + .../policies/RouterPolicyFacade.java | 3 +- .../router/HashBasedRouterPolicy.java | 5 + .../router/LoadBasedRouterPolicy.java | 5 + .../policies/router/PriorityRouterPolicy.java | 5 + .../router/UniformRandomRouterPolicy.java | 4 + .../router/WeightedRandomRouterPolicy.java | 5 + .../server/MockResourceManagerFacade.java | 165 ++++- .../router/BaseRouterPoliciesTest.java | 28 + .../utils/FederationStateStoreTestUtil.java | 15 +- .../hadoop-yarn-server-router/pom.xml | 5 + .../yarn/server/router/RouterServerUtil.java | 63 ++ .../AbstractClientRequestInterceptor.java | 41 +- .../DefaultClientRequestInterceptor.java | 31 +- .../clientrm/FederationClientInterceptor.java | 684 ++++++++++++++++++ .../clientrm/BaseRouterClientRMTest.java | 22 +- .../TestFederationClientInterceptor.java | 403 +++++++++++ .../TestFederationClientInterceptorRetry.java | 295 ++++++++ .../TestableFederationClientInterceptor.java | 75 ++ 23 files changed, 1866 insertions(+), 39 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1432867ab6e..0c0dd113d7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2674,6 +2674,14 @@ public class YarnConfiguration extends Configuration { "org.apache.hadoop.yarn.server.router.rmadmin." + "DefaultRMAdminRequestInterceptor"; + /** + * The number of retries for GetNewApplication and SubmitApplication in + * {@code FederationClientInterceptor}. + */ + public static final String ROUTER_CLIENTRM_SUBMIT_RETRY = + ROUTER_PREFIX + "submit.retry"; + public static final int DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY = 3; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index c3cb78d806d..a439cd5ac5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -136,6 +136,11 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { configurationPrefixToSkipCompare .add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED); + // Ignore all Router Federation variables + + configurationPrefixToSkipCompare + .add(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY); + // Set by container-executor.cfg configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java index 738942300a0..a053fdb9376 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java @@ -24,7 +24,6 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.junit.Test; import java.net.InetSocketAddress; -import java.net.SocketAddress; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java index 3c44e7e1dbb..da63bc1de46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java @@ -51,7 +51,7 @@ public final class FederationPolicyInitializationContextValidator { if (policyContext.getFederationSubclusterResolver() == null) { throw new FederationPolicyInitializationException( - "The FederationStateStoreFacase provided is null. Cannot" + "The FederationSubclusterResolver provided is null. Cannot" + " reinitalize successfully."); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java index 37ce9423557..97e484846bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java @@ -19,12 +19,14 @@ package org.apache.hadoop.yarn.server.federation.policies; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; @@ -41,6 +43,9 @@ public final class FederationPolicyUtils { private static final Logger LOG = LoggerFactory.getLogger(FederationPolicyUtils.class); + public static final String NO_ACTIVE_SUBCLUSTER_AVAILABLE = + "No active SubCluster available to submit the request."; + /** Disable constructor. */ private FederationPolicyUtils() { } @@ -165,4 +170,34 @@ public final class FederationPolicyUtils { return federationPolicyManager.getAMRMPolicy(context, oldPolicy); } + /** + * Validate if there is any active subcluster that is not blacklisted, it will + * throw an exception if there are no usable subclusters. + * + * @param activeSubClusters the list of subClusters as identified by + * {@link SubClusterId} currently active. + * @param blackListSubClusters the list of subClusters as identified by + * {@link SubClusterId} to blackList from the selection of the home + * subCluster. + * @throws FederationPolicyException if there are no usable subclusters. + */ + public static void validateSubClusterAvailability( + List activeSubClusters, + List blackListSubClusters) + throws FederationPolicyException { + if (activeSubClusters != null && !activeSubClusters.isEmpty()) { + if (blackListSubClusters == null) { + return; + } + for (SubClusterId scId : activeSubClusters) { + if (!blackListSubClusters.contains(scId)) { + // There is at least one active subcluster + return; + } + } + } + throw new FederationPolicyException( + FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE); + } + } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java index 44c1b10a729..52c290553cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -56,7 +57,7 @@ public class RouterPolicyFacade { @VisibleForTesting Map globalPolicyMap; - public RouterPolicyFacade(YarnConfiguration conf, + public RouterPolicyFacade(Configuration conf, FederationStateStoreFacade facade, SubClusterResolver resolver, SubClusterId homeSubcluster) throws FederationPolicyInitializationException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java index 257a9fef67d..cc118806653 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -76,6 +77,10 @@ public class HashBasedRouterPolicy extends AbstractRouterPolicy { Map activeSubclusters = getActiveSubclusters(); + FederationPolicyUtils.validateSubClusterAvailability( + new ArrayList(activeSubclusters.keySet()), + blackListSubClusters); + if (blackListSubClusters != null) { // Remove from the active SubClusters from StateStore the blacklisted ones diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java index c12400194ad..06e445bd60c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java @@ -17,12 +17,14 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; @@ -72,6 +74,9 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy { Map activeSubclusters = getActiveSubclusters(); + FederationPolicyUtils.validateSubClusterAvailability( + new ArrayList(activeSubclusters.keySet()), blacklist); + Map weights = getPolicyInfo().getRouterPolicyWeights(); SubClusterIdInfo chosen = null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java index 59f8767970a..a1f7666a9ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java @@ -17,11 +17,13 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -44,6 +46,9 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy { Map activeSubclusters = getActiveSubclusters(); + FederationPolicyUtils.validateSubClusterAvailability( + new ArrayList(activeSubclusters.keySet()), blacklist); + // This finds the sub-cluster with the highest weight among the // currently active ones. Map weights = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java index bc729b74098..7a8be91fcd0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -86,6 +87,9 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy { List list = new ArrayList<>(activeSubclusters.keySet()); + FederationPolicyUtils.validateSubClusterAvailability(list, + blackListSubClusters); + if (blackListSubClusters != null) { // Remove from the active SubClusters from StateStore the blacklisted ones diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java index 7f230a711ee..aec75760414 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java @@ -18,12 +18,14 @@ package org.apache.hadoop.yarn.server.federation.policies.router; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; @@ -51,6 +53,9 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy { Map activeSubclusters = getActiveSubclusters(); + FederationPolicyUtils.validateSubClusterAvailability( + new ArrayList(activeSubclusters.keySet()), blacklist); + // note: we cannot pre-compute the weights, as the set of activeSubcluster // changes dynamically (and this would unfairly spread the load to // sub-clusters adjacent to an inactive one), hence we need to count/scan diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index 65c12c642c7..68c55ac1a97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server; import java.io.IOException; +import java.net.ConnectException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -154,6 +155,7 @@ import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import com.google.common.base.Strings; /** @@ -175,6 +177,13 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, new HashMap(); private AtomicInteger containerIndex = new AtomicInteger(0); private Configuration conf; + private int subClusterId; + final private AtomicInteger applicationCounter = new AtomicInteger(0); + + // True if the Mock RM is running, false otherwise. + // This property allows us to write tests for specific scenario as Yarn RM + // down e.g. network issue, failover. + private boolean isRunning; private boolean shouldReRegisterNext = false; @@ -187,14 +196,25 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, public MockResourceManagerFacade(Configuration conf, int startContainerIndex) { + this(conf, startContainerIndex, 0, true); + } + + public MockResourceManagerFacade(Configuration conf, int startContainerIndex, + int subClusterId, boolean isRunning) { this.conf = conf; this.containerIndex.set(startContainerIndex); + this.subClusterId = subClusterId; + this.isRunning = isRunning; } public void setShouldReRegisterNext() { shouldReRegisterNext = true; } + public void setRunningMode(boolean mode) { + this.isRunning = mode; + } + private static String getAppIdentifier() throws IOException { AMRMTokenIdentifier result = null; UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser(); @@ -208,10 +228,19 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, return result != null ? result.getApplicationAttemptId().toString() : ""; } + private void validateRunning() throws ConnectException { + if (!isRunning) { + throw new ConnectException("RM is stopped"); + } + } + @Override public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnException, IOException { + + validateRunning(); + String amrmToken = getAppIdentifier(); LOG.info("Registering application attempt: " + amrmToken); @@ -248,6 +277,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, public FinishApplicationMasterResponse finishApplicationMaster( FinishApplicationMasterRequest request) throws YarnException, IOException { + + validateRunning(); + String amrmToken = getAppIdentifier(); LOG.info("Finishing application attempt: " + amrmToken); @@ -284,6 +316,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, @Override public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { + + validateRunning(); + if (request.getAskList() != null && request.getAskList().size() > 0 && request.getReleaseList() != null && request.getReleaseList().size() > 0) { @@ -391,6 +426,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, public GetApplicationReportResponse getApplicationReport( GetApplicationReportRequest request) throws YarnException, IOException { + validateRunning(); + GetApplicationReportResponse response = Records.newRecord(GetApplicationReportResponse.class); ApplicationReport report = Records.newRecord(ApplicationReport.class); @@ -407,6 +444,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, GetApplicationAttemptReportRequest request) throws YarnException, IOException { + validateRunning(); + GetApplicationAttemptReportResponse response = Records.newRecord(GetApplicationAttemptReportResponse.class); ApplicationAttemptReport report = @@ -420,12 +459,19 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, @Override public GetNewApplicationResponse getNewApplication( GetNewApplicationRequest request) throws YarnException, IOException { - return GetNewApplicationResponse.newInstance(null, null, null); + + validateRunning(); + + return GetNewApplicationResponse.newInstance(ApplicationId.newInstance( + subClusterId, applicationCounter.incrementAndGet()), null, null); } @Override public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException, IOException { + + validateRunning(); + ApplicationId appId = null; if (request.getApplicationSubmissionContext() != null) { appId = request.getApplicationSubmissionContext().getApplicationId(); @@ -438,6 +484,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, @Override public KillApplicationResponse forceKillApplication( KillApplicationRequest request) throws YarnException, IOException { + + validateRunning(); + ApplicationId appId = null; if (request.getApplicationId() != null) { appId = request.getApplicationId(); @@ -453,48 +502,72 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, @Override public GetClusterMetricsResponse getClusterMetrics( GetClusterMetricsRequest request) throws YarnException, IOException { + + validateRunning(); + return GetClusterMetricsResponse.newInstance(null); } @Override public GetApplicationsResponse getApplications(GetApplicationsRequest request) throws YarnException, IOException { + + validateRunning(); + return GetApplicationsResponse.newInstance(null); } @Override public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) throws YarnException, IOException { + + validateRunning(); + return GetClusterNodesResponse.newInstance(null); } @Override public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) throws YarnException, IOException { + + validateRunning(); + return GetQueueInfoResponse.newInstance(null); } @Override public GetQueueUserAclsInfoResponse getQueueUserAcls( GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + + validateRunning(); + return GetQueueUserAclsInfoResponse.newInstance(null); } @Override public GetDelegationTokenResponse getDelegationToken( GetDelegationTokenRequest request) throws YarnException, IOException { + + validateRunning(); + return GetDelegationTokenResponse.newInstance(null); } @Override public RenewDelegationTokenResponse renewDelegationToken( RenewDelegationTokenRequest request) throws YarnException, IOException { + + validateRunning(); + return RenewDelegationTokenResponse.newInstance(0); } @Override public CancelDelegationTokenResponse cancelDelegationToken( CancelDelegationTokenRequest request) throws YarnException, IOException { + + validateRunning(); + return CancelDelegationTokenResponse.newInstance(); } @@ -502,36 +575,54 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( MoveApplicationAcrossQueuesRequest request) throws YarnException, IOException { + + validateRunning(); + return MoveApplicationAcrossQueuesResponse.newInstance(); } @Override public GetApplicationAttemptsResponse getApplicationAttempts( GetApplicationAttemptsRequest request) throws YarnException, IOException { + + validateRunning(); + return GetApplicationAttemptsResponse.newInstance(null); } @Override public GetContainerReportResponse getContainerReport( GetContainerReportRequest request) throws YarnException, IOException { + + validateRunning(); + return GetContainerReportResponse.newInstance(null); } @Override public GetContainersResponse getContainers(GetContainersRequest request) throws YarnException, IOException { + + validateRunning(); + return GetContainersResponse.newInstance(null); } @Override public ReservationSubmissionResponse submitReservation( ReservationSubmissionRequest request) throws YarnException, IOException { + + validateRunning(); + return ReservationSubmissionResponse.newInstance(); } @Override public ReservationListResponse listReservations( ReservationListRequest request) throws YarnException, IOException { + + validateRunning(); + return ReservationListResponse .newInstance(new ArrayList()); } @@ -539,18 +630,27 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, @Override public ReservationUpdateResponse updateReservation( ReservationUpdateRequest request) throws YarnException, IOException { + + validateRunning(); + return ReservationUpdateResponse.newInstance(); } @Override public ReservationDeleteResponse deleteReservation( ReservationDeleteRequest request) throws YarnException, IOException { + + validateRunning(); + return ReservationDeleteResponse.newInstance(); } @Override public GetNodesToLabelsResponse getNodeToLabels( GetNodesToLabelsRequest request) throws YarnException, IOException { + + validateRunning(); + return GetNodesToLabelsResponse .newInstance(new HashMap>()); } @@ -558,18 +658,27 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, @Override public GetClusterNodeLabelsResponse getClusterNodeLabels( GetClusterNodeLabelsRequest request) throws YarnException, IOException { + + validateRunning(); + return GetClusterNodeLabelsResponse.newInstance(new ArrayList()); } @Override public GetLabelsToNodesResponse getLabelsToNodes( GetLabelsToNodesRequest request) throws YarnException, IOException { + + validateRunning(); + return GetLabelsToNodesResponse.newInstance(null); } @Override public GetNewReservationResponse getNewReservation( GetNewReservationRequest request) throws YarnException, IOException { + + validateRunning(); + return GetNewReservationResponse .newInstance(ReservationId.newInstance(0, 0)); } @@ -577,6 +686,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, @Override public FailApplicationAttemptResponse failApplicationAttempt( FailApplicationAttemptRequest request) throws YarnException, IOException { + + validateRunning(); + return FailApplicationAttemptResponse.newInstance(); } @@ -584,12 +696,18 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, public UpdateApplicationPriorityResponse updateApplicationPriority( UpdateApplicationPriorityRequest request) throws YarnException, IOException { + + validateRunning(); + return UpdateApplicationPriorityResponse.newInstance(null); } @Override public SignalContainerResponse signalToContainer( SignalContainerRequest request) throws YarnException, IOException { + + validateRunning(); + return new SignalContainerResponsePBImpl(); } @@ -597,18 +715,27 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( UpdateApplicationTimeoutsRequest request) throws YarnException, IOException { + + validateRunning(); + return UpdateApplicationTimeoutsResponse.newInstance(); } @Override public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) throws StandbyException, YarnException, IOException { + + validateRunning(); + return RefreshQueuesResponse.newInstance(); } @Override public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) throws StandbyException, YarnException, IOException { + + validateRunning(); + return RefreshNodesResponse.newInstance(); } @@ -616,6 +743,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration( RefreshSuperUserGroupsConfigurationRequest request) throws StandbyException, YarnException, IOException { + + validateRunning(); + return RefreshSuperUserGroupsConfigurationResponse.newInstance(); } @@ -623,36 +753,54 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings( RefreshUserToGroupsMappingsRequest request) throws StandbyException, YarnException, IOException { + + validateRunning(); + return RefreshUserToGroupsMappingsResponse.newInstance(); } @Override public RefreshAdminAclsResponse refreshAdminAcls( RefreshAdminAclsRequest request) throws YarnException, IOException { + + validateRunning(); + return RefreshAdminAclsResponse.newInstance(); } @Override public RefreshServiceAclsResponse refreshServiceAcls( RefreshServiceAclsRequest request) throws YarnException, IOException { + + validateRunning(); + return RefreshServiceAclsResponse.newInstance(); } @Override public UpdateNodeResourceResponse updateNodeResource( UpdateNodeResourceRequest request) throws YarnException, IOException { + + validateRunning(); + return UpdateNodeResourceResponse.newInstance(); } @Override public RefreshNodesResourcesResponse refreshNodesResources( RefreshNodesResourcesRequest request) throws YarnException, IOException { + + validateRunning(); + return RefreshNodesResourcesResponse.newInstance(); } @Override public AddToClusterNodeLabelsResponse addToClusterNodeLabels( AddToClusterNodeLabelsRequest request) throws YarnException, IOException { + + validateRunning(); + return AddToClusterNodeLabelsResponse.newInstance(); } @@ -660,12 +808,18 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels( RemoveFromClusterNodeLabelsRequest request) throws YarnException, IOException { + + validateRunning(); + return RemoveFromClusterNodeLabelsResponse.newInstance(); } @Override public ReplaceLabelsOnNodeResponse replaceLabelsOnNode( ReplaceLabelsOnNodeRequest request) throws YarnException, IOException { + + validateRunning(); + return ReplaceLabelsOnNodeResponse.newInstance(); } @@ -673,6 +827,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, public CheckForDecommissioningNodesResponse checkForDecommissioningNodes( CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest) throws YarnException, IOException { + + validateRunning(); + return CheckForDecommissioningNodesResponse.newInstance(null); } @@ -680,11 +837,17 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority( RefreshClusterMaxPriorityRequest request) throws YarnException, IOException { + + validateRunning(); + return RefreshClusterMaxPriorityResponse.newInstance(); } @Override public String[] getGroupsForUser(String user) throws IOException { + + validateRunning(); + return new String[0]; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java index c7a7767794d..d09ba754d55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java @@ -26,6 +26,7 @@ import java.util.Random; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo; @@ -87,4 +88,31 @@ public abstract class BaseRouterPoliciesTest Assert.assertEquals(removed, chosen); } } + + /** + * This test validates the correctness of blacklist logic in case the cluster + * has no active subclusters. + */ + @Test + public void testAllBlacklistSubcluster() throws YarnException { + FederationRouterPolicy localPolicy = (FederationRouterPolicy) getPolicy(); + ApplicationSubmissionContext applicationSubmissionContext = + ApplicationSubmissionContext.newInstance(null, null, null, null, null, + false, false, 0, Resources.none(), null, false, null, null); + Map activeSubClusters = + getActiveSubclusters(); + if (activeSubClusters != null && activeSubClusters.size() > 1 + && !(localPolicy instanceof RejectRouterPolicy)) { + List blacklistSubclusters = + new ArrayList(activeSubClusters.keySet()); + try { + localPolicy.getHomeSubcluster(applicationSubmissionContext, + blacklistSubclusters); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage() + .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE)); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java index 649a61b4ef8..423bf86b315 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolic import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; @@ -69,7 +70,7 @@ public class FederationStateStoreTestUtil { SubClusterState.SC_RUNNING, CLOCK.getTime(), "capability"); } - private void registerSubCluster(SubClusterId subClusterId) + public void registerSubCluster(SubClusterId subClusterId) throws YarnException { SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId); @@ -164,4 +165,16 @@ public class FederationStateStoreTestUtil { return result.getPolicyConfiguration(); } + public void deregisterAllSubClusters() throws YarnException { + for (SubClusterId sc : getAllSubClusterIds(true)) { + deRegisterSubCluster(sc); + } + } + + private void deRegisterSubCluster(SubClusterId subClusterId) + throws YarnException { + stateStore.deregisterSubCluster(SubClusterDeregisterRequest + .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED)); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml index f9169e16fd1..f27b2b224fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml @@ -55,6 +55,11 @@ test + + org.apache.hadoop + hadoop-yarn-server-common + + org.apache.hadoop hadoop-yarn-server-common diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java new file mode 100644 index 00000000000..cc96da62331 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Common utility methods used by the Router server. + * + */ +@Private +@Unstable +public final class RouterServerUtil { + + /** Disable constructor. */ + private RouterServerUtil() { + } + + public static final Logger LOG = + LoggerFactory.getLogger(RouterServerUtil.class); + + /** + * Throws an exception due to an error. + * + * @param errMsg the error message + * @param t the throwable raised in the called class. + * @throws YarnException on failure + */ + @Public + @Unstable + public static void logAndThrowException(String errMsg, Throwable t) + throws YarnException { + if (t != null) { + LOG.error(errMsg, t); + throw new YarnException(errMsg, t); + } else { + LOG.error(errMsg); + throw new YarnException(errMsg); + } + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java index 5980b03a66b..01ba3bdcadf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java @@ -18,7 +18,13 @@ package org.apache.hadoop.yarn.server.router.clientrm; +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implements the {@link ClientRequestInterceptor} interface and provides common @@ -28,9 +34,16 @@ import org.apache.hadoop.conf.Configuration; */ public abstract class AbstractClientRequestInterceptor implements ClientRequestInterceptor { + + private static final Logger LOG = + LoggerFactory.getLogger(AbstractClientRequestInterceptor.class); + private Configuration conf; private ClientRequestInterceptor nextInterceptor; + @SuppressWarnings("checkstyle:visibilitymodifier") + protected UserGroupInformation user = null; + /** * Sets the {@link ClientRequestInterceptor} in the chain. */ @@ -63,9 +76,10 @@ public abstract class AbstractClientRequestInterceptor * Initializes the {@link ClientRequestInterceptor}. */ @Override - public void init(String user) { + public void init(String userName) { + setupUser(userName); if (this.nextInterceptor != null) { - this.nextInterceptor.init(user); + this.nextInterceptor.init(userName); } } @@ -87,4 +101,27 @@ public abstract class AbstractClientRequestInterceptor return this.nextInterceptor; } + private void setupUser(String userName) { + + try { + // Do not create a proxy user if user name matches the user name on + // current UGI + if (userName.equalsIgnoreCase( + UserGroupInformation.getCurrentUser().getUserName())) { + user = UserGroupInformation.getCurrentUser(); + } else { + user = UserGroupInformation.createProxyUser(userName, + UserGroupInformation.getCurrentUser()); + } + } catch (IOException e) { + String message = "Error while creating Router ClientRM Service for user:"; + if (user != null) { + message += ", user: " + user; + } + + LOG.info(message); + throw new YarnRuntimeException(message, e); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java index 9e2bfed9543..71de6b470e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; @@ -85,8 +84,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRespo import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; @@ -98,27 +95,14 @@ import com.google.common.annotations.VisibleForTesting; */ public class DefaultClientRequestInterceptor extends AbstractClientRequestInterceptor { - private static final Logger LOG = - LoggerFactory.getLogger(DefaultClientRequestInterceptor.class); private ApplicationClientProtocol clientRMProxy; - private UserGroupInformation user = null; @Override public void init(String userName) { super.init(userName); + + final Configuration conf = this.getConf(); try { - // Do not create a proxy user if user name matches the user name on - // current UGI - if (userName.equalsIgnoreCase( - UserGroupInformation.getCurrentUser().getUserName())) { - user = UserGroupInformation.getCurrentUser(); - } else { - user = UserGroupInformation.createProxyUser(userName, - UserGroupInformation.getCurrentUser()); - } - - final Configuration conf = this.getConf(); - clientRMProxy = user.doAs(new PrivilegedExceptionAction() { @Override @@ -127,16 +111,9 @@ public class DefaultClientRequestInterceptor ApplicationClientProtocol.class); } }); - } catch (IOException e) { - String message = "Error while creating Router ClientRM Service for user:"; - if (user != null) { - message += ", user: " + user; - } - - LOG.info(message); - throw new YarnRuntimeException(message, e); } catch (Exception e) { - throw new YarnRuntimeException(e); + throw new YarnRuntimeException( + "Unable to create the interface to reach the YarnRM", e); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java new file mode 100644 index 00000000000..ecf53ac10c4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -0,0 +1,684 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; +import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade; +import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.router.RouterServerUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Extends the {@code AbstractRequestInterceptorClient} class and provides an + * implementation for federation of YARN RM and scaling an application across + * multiple YARN SubClusters. All the federation specific implementation is + * encapsulated in this class. This is always the last intercepter in the chain. + */ +public class FederationClientInterceptor + extends AbstractClientRequestInterceptor { + + /* + * TODO YARN-6740 Federation Router (hiding multiple RMs for + * ApplicationClientProtocol) phase 2. + * + * The current implementation finalized the main 4 calls (getNewApplication, + * submitApplication, forceKillApplication and getApplicationReport). Those + * allow us to execute applications E2E. + */ + + private static final Logger LOG = + LoggerFactory.getLogger(FederationClientInterceptor.class); + + private int numSubmitRetries; + private Map clientRMProxies; + private FederationStateStoreFacade federationFacade; + private Random rand; + private RouterPolicyFacade policyFacade; + + @Override + public void init(String userName) { + super.init(userName); + + federationFacade = FederationStateStoreFacade.getInstance(); + rand = new Random(System.currentTimeMillis()); + + final Configuration conf = this.getConf(); + + try { + policyFacade = new RouterPolicyFacade(conf, federationFacade, + this.federationFacade.getSubClusterResolver(), null); + } catch (FederationPolicyInitializationException e) { + LOG.error(e.getMessage()); + } + + numSubmitRetries = + conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY, + YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY); + + clientRMProxies = + new ConcurrentHashMap(); + + } + + @Override + public void setNextInterceptor(ClientRequestInterceptor next) { + throw new YarnRuntimeException("setNextInterceptor is being called on " + + "FederationClientRequestInterceptor, which should be the last one " + + "in the chain. Check if the interceptor pipeline configuration " + + "is correct"); + } + + @VisibleForTesting + protected ApplicationClientProtocol getClientRMProxyForSubCluster( + SubClusterId subClusterId) throws YarnException { + + if (clientRMProxies.containsKey(subClusterId)) { + return clientRMProxies.get(subClusterId); + } + + ApplicationClientProtocol clientRMProxy = null; + try { + clientRMProxy = + user.doAs(new PrivilegedExceptionAction() { + @Override + public ApplicationClientProtocol run() throws Exception { + return ClientRMProxy.createRMProxy(getConf(), + ApplicationClientProtocol.class); + } + }); + } catch (Exception e) { + RouterServerUtil.logAndThrowException( + "Unable to create the interface to reach the SubCluster " + + subClusterId, + e); + } + + clientRMProxies.put(subClusterId, clientRMProxy); + return clientRMProxy; + } + + private SubClusterId getRandomActiveSubCluster( + Map activeSubclusters) + throws YarnException { + + if (activeSubclusters == null || activeSubclusters.size() < 1) { + RouterServerUtil.logAndThrowException( + FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null); + } + List list = new ArrayList<>(activeSubclusters.keySet()); + + return list.get(rand.nextInt(list.size())); + } + + /** + * Yarn Router forwards every getNewApplication requests to any RM. During + * this operation there will be no communication with the State Store. The + * Router will forward the requests to any SubCluster. The Router will retry + * to submit the request on #numSubmitRetries different SubClusters. The + * SubClusters are randomly chosen from the active ones. + * + * Possible failures and behaviors: + * + * Client: identical behavior as {@code ClientRMService}. + * + * Router: the Client will timeout and resubmit. + * + * ResourceManager: the Router will timeout and contacts another RM. + * + * StateStore: not in the execution. + */ + @Override + public GetNewApplicationResponse getNewApplication( + GetNewApplicationRequest request) throws YarnException, IOException { + Map subClustersActive = + federationFacade.getSubClusters(true); + + for (int i = 0; i < numSubmitRetries; ++i) { + SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive); + LOG.debug( + "getNewApplication try #" + i + " on SubCluster " + subClusterId); + ApplicationClientProtocol clientRMProxy = + getClientRMProxyForSubCluster(subClusterId); + GetNewApplicationResponse response = null; + try { + response = clientRMProxy.getNewApplication(request); + } catch (Exception e) { + LOG.warn("Unable to create a new ApplicationId in SubCluster " + + subClusterId.getId(), e); + } + + if (response != null) { + return response; + } else { + // Empty response from the ResourceManager. + // Blacklist this subcluster for this request. + subClustersActive.remove(subClusterId); + } + + } + + String errMsg = "Fail to create a new application."; + LOG.error(errMsg); + throw new YarnException(errMsg); + } + + /** + * Today, in YARN there are no checks of any applicationId submitted. + * + * Base scenarios: + * + * The Client submits an application to the Router. • The Router selects one + * SubCluster to forward the request. • The Router inserts a tuple into + * StateStore with the selected SubCluster (e.g. SC1) and the appId. • The + * State Store replies with the selected SubCluster (e.g. SC1). • The Router + * submits the request to the selected SubCluster. + * + * In case of State Store failure: + * + * The client submits an application to the Router. • The Router selects one + * SubCluster to forward the request. • The Router inserts a tuple into State + * Store with the selected SubCluster (e.g. SC1) and the appId. • Due to the + * State Store down the Router times out and it will retry depending on the + * FederationFacade settings. • The Router replies to the client with an error + * message. + * + * If State Store fails after inserting the tuple: identical behavior as + * {@code ClientRMService}. + * + * In case of Router failure: + * + * Scenario 1 – Crash before submission to the ResourceManager + * + * The Client submits an application to the Router. • The Router selects one + * SubCluster to forward the request. • The Router inserts a tuple into State + * Store with the selected SubCluster (e.g. SC1) and the appId. • The Router + * crashes. • The Client timeouts and resubmits the application. • The Router + * selects one SubCluster to forward the request. • The Router inserts a tuple + * into State Store with the selected SubCluster (e.g. SC2) and the appId. • + * Because the tuple is already inserted in the State Store, it returns the + * previous selected SubCluster (e.g. SC1). • The Router submits the request + * to the selected SubCluster (e.g. SC1). + * + * Scenario 2 – Crash after submission to the ResourceManager + * + * • The Client submits an application to the Router. • The Router selects one + * SubCluster to forward the request. • The Router inserts a tuple into State + * Store with the selected SubCluster (e.g. SC1) and the appId. • The Router + * submits the request to the selected SubCluster. • The Router crashes. • The + * Client timeouts and resubmit the application. • The Router selects one + * SubCluster to forward the request. • The Router inserts a tuple into State + * Store with the selected SubCluster (e.g. SC2) and the appId. • The State + * Store replies with the selected SubCluster (e.g. SC1). • The Router submits + * the request to the selected SubCluster (e.g. SC1). When a client re-submits + * the same application to the same RM, it does not raise an exception and + * replies with operation successful message. + * + * In case of Client failure: identical behavior as {@code ClientRMService}. + * + * In case of ResourceManager failure: + * + * The Client submits an application to the Router. • The Router selects one + * SubCluster to forward the request. • The Router inserts a tuple into State + * Store with the selected SubCluster (e.g. SC1) and the appId. • The Router + * submits the request to the selected SubCluster. • The entire SubCluster is + * down – all the RMs in HA or the master RM is not reachable. • The Router + * times out. • The Router selects a new SubCluster to forward the request. • + * The Router update a tuple into State Store with the selected SubCluster + * (e.g. SC2) and the appId. • The State Store replies with OK answer. • The + * Router submits the request to the selected SubCluster (e.g. SC2). + */ + @Override + public SubmitApplicationResponse submitApplication( + SubmitApplicationRequest request) throws YarnException, IOException { + if (request == null || request.getApplicationSubmissionContext() == null + || request.getApplicationSubmissionContext() + .getApplicationId() == null) { + RouterServerUtil + .logAndThrowException("Missing submitApplication request or " + + "applicationSubmissionContex information.", null); + } + + ApplicationId applicationId = + request.getApplicationSubmissionContext().getApplicationId(); + + List blacklist = new ArrayList(); + + for (int i = 0; i < numSubmitRetries; ++i) { + + SubClusterId subClusterId = policyFacade.getHomeSubcluster( + request.getApplicationSubmissionContext(), blacklist); + LOG.info("submitApplication appId" + applicationId + " try #" + i + + " on SubCluster " + subClusterId); + + ApplicationHomeSubCluster appHomeSubCluster = + ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); + + if (i == 0) { + try { + // persist the mapping of applicationId and the subClusterId which has + // been selected as its home + subClusterId = + federationFacade.addApplicationHomeSubCluster(appHomeSubCluster); + } catch (YarnException e) { + String message = "Unable to insert the ApplicationId " + applicationId + + " into the FederationStateStore"; + RouterServerUtil.logAndThrowException(message, e); + } + } else { + try { + // update the mapping of applicationId and the home subClusterId to + // the new subClusterId we have selected + federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster); + } catch (YarnException e) { + String message = "Unable to update the ApplicationId " + applicationId + + " into the FederationStateStore"; + SubClusterId subClusterIdInStateStore = + federationFacade.getApplicationHomeSubCluster(applicationId); + if (subClusterId == subClusterIdInStateStore) { + LOG.info("Application " + applicationId + + " already submitted on SubCluster " + subClusterId); + } else { + RouterServerUtil.logAndThrowException(message, e); + } + } + } + + ApplicationClientProtocol clientRMProxy = + getClientRMProxyForSubCluster(subClusterId); + + SubmitApplicationResponse response = null; + try { + response = clientRMProxy.submitApplication(request); + } catch (Exception e) { + LOG.warn("Unable to submit the application " + applicationId + + "to SubCluster " + subClusterId.getId(), e); + } + + if (response != null) { + LOG.info("Application " + + request.getApplicationSubmissionContext().getApplicationName() + + " with appId " + applicationId + " submitted on " + subClusterId); + return response; + } else { + // Empty response from the ResourceManager. + // Blacklist this subcluster for this request. + blacklist.add(subClusterId); + } + } + + String errMsg = "Application " + + request.getApplicationSubmissionContext().getApplicationName() + + " with appId " + applicationId + " failed to be submitted."; + LOG.error(errMsg); + throw new YarnException(errMsg); + } + + /** + * The Yarn Router will forward to the respective Yarn RM in which the AM is + * running. + * + * Possible failures and behaviors: + * + * Client: identical behavior as {@code ClientRMService}. + * + * Router: the Client will timeout and resubmit the request. + * + * ResourceManager: the Router will timeout and the call will fail. + * + * State Store: the Router will timeout and it will retry depending on the + * FederationFacade settings - if the failure happened before the select + * operation. + */ + @Override + public KillApplicationResponse forceKillApplication( + KillApplicationRequest request) throws YarnException, IOException { + + if (request == null || request.getApplicationId() == null) { + RouterServerUtil.logAndThrowException( + "Missing forceKillApplication request or ApplicationId.", null); + } + ApplicationId applicationId = request.getApplicationId(); + SubClusterId subClusterId = null; + + try { + subClusterId = federationFacade + .getApplicationHomeSubCluster(request.getApplicationId()); + } catch (YarnException e) { + RouterServerUtil.logAndThrowException("Application " + applicationId + + " does not exist in FederationStateStore", e); + } + + ApplicationClientProtocol clientRMProxy = + getClientRMProxyForSubCluster(subClusterId); + + KillApplicationResponse response = null; + try { + LOG.info("forceKillApplication " + applicationId + " on SubCluster " + + subClusterId); + response = clientRMProxy.forceKillApplication(request); + } catch (Exception e) { + LOG.error("Unable to kill the application report for " + + request.getApplicationId() + "to SubCluster " + + subClusterId.getId(), e); + throw e; + } + + if (response == null) { + LOG.error("No response when attempting to kill the application " + + applicationId + " to SubCluster " + subClusterId.getId()); + } + + return response; + } + + /** + * The Yarn Router will forward to the respective Yarn RM in which the AM is + * running. + * + * Possible failure: + * + * Client: identical behavior as {@code ClientRMService}. + * + * Router: the Client will timeout and resubmit the request. + * + * ResourceManager: the Router will timeout and the call will fail. + * + * State Store: the Router will timeout and it will retry depending on the + * FederationFacade settings - if the failure happened before the select + * operation. + */ + @Override + public GetApplicationReportResponse getApplicationReport( + GetApplicationReportRequest request) throws YarnException, IOException { + + if (request == null || request.getApplicationId() == null) { + RouterServerUtil.logAndThrowException( + "Missing getApplicationReport request or applicationId information.", + null); + } + + SubClusterId subClusterId = null; + + try { + subClusterId = federationFacade + .getApplicationHomeSubCluster(request.getApplicationId()); + } catch (YarnException e) { + RouterServerUtil + .logAndThrowException("Application " + request.getApplicationId() + + " does not exist in FederationStateStore", e); + } + + ApplicationClientProtocol clientRMProxy = + getClientRMProxyForSubCluster(subClusterId); + + GetApplicationReportResponse response = null; + try { + response = clientRMProxy.getApplicationReport(request); + } catch (Exception e) { + LOG.error("Unable to get the application report for " + + request.getApplicationId() + "to SubCluster " + + subClusterId.getId(), e); + throw e; + } + + if (response == null) { + LOG.error("No response when attempting to retrieve the report of " + + "the application " + request.getApplicationId() + " to SubCluster " + + subClusterId.getId()); + } + + return response; + } + + @Override + public GetApplicationsResponse getApplications(GetApplicationsRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetClusterMetricsResponse getClusterMetrics( + GetClusterMetricsRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetQueueUserAclsInfoResponse getQueueUserAcls( + GetQueueUserAclsInfoRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues( + MoveApplicationAcrossQueuesRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetNewReservationResponse getNewReservation( + GetNewReservationRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public ReservationSubmissionResponse submitReservation( + ReservationSubmissionRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public ReservationListResponse listReservations( + ReservationListRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public ReservationUpdateResponse updateReservation( + ReservationUpdateRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public ReservationDeleteResponse deleteReservation( + ReservationDeleteRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetNodesToLabelsResponse getNodeToLabels( + GetNodesToLabelsRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetLabelsToNodesResponse getLabelsToNodes( + GetLabelsToNodesRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetClusterNodeLabelsResponse getClusterNodeLabels( + GetClusterNodeLabelsRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetApplicationAttemptReportResponse getApplicationAttemptReport( + GetApplicationAttemptReportRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetApplicationAttemptsResponse getApplicationAttempts( + GetApplicationAttemptsRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetContainerReportResponse getContainerReport( + GetContainerReportRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetContainersResponse getContainers(GetContainersRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public GetDelegationTokenResponse getDelegationToken( + GetDelegationTokenRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public RenewDelegationTokenResponse renewDelegationToken( + RenewDelegationTokenRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public CancelDelegationTokenResponse cancelDelegationToken( + CancelDelegationTokenRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public FailApplicationAttemptResponse failApplicationAttempt( + FailApplicationAttemptRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public UpdateApplicationPriorityResponse updateApplicationPriority( + UpdateApplicationPriorityRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public SignalContainerResponse signalToContainer( + SignalContainerRequest request) throws YarnException, IOException { + throw new NotImplementedException(); + } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java index 7e1508493ca..7fc4719bfb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java @@ -119,29 +119,41 @@ public abstract class BaseRouterClientRMTest { return this.clientrmService; } - @Before - public void setUp() { - this.conf = new YarnConfiguration(); + protected YarnConfiguration createConfiguration() { + YarnConfiguration config = new YarnConfiguration(); String mockPassThroughInterceptorClass = PassThroughClientRequestInterceptor.class.getName(); // Create a request intercepter pipeline for testing. The last one in the // chain will call the mock resource manager. The others in the chain will // simply forward it to the next one in the chain - this.conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE, + config.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE, mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + "," + MockClientRequestInterceptor.class.getName()); - this.conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE, + config.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE, TEST_MAX_CACHE_SIZE); + return config; + } + @Before + public void setUp() { + this.conf = createConfiguration(); this.dispatcher = new AsyncDispatcher(); this.dispatcher.init(conf); this.dispatcher.start(); this.clientrmService = createAndStartRouterClientRMService(); } + public void setUpConfig() { + this.conf = createConfiguration(); + } + + protected Configuration getConf() { + return this.conf; + } + @After public void tearDown() { if (clientrmService != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java new file mode 100644 index 00000000000..87dfc95cd9e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java @@ -0,0 +1,403 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extends the {@code BaseRouterClientRMTest} and overrides methods in order to + * use the {@code RouterClientRMService} pipeline test cases for testing the + * {@code FederationInterceptor} class. The tests for + * {@code RouterClientRMService} has been written cleverly so that it can be + * reused to validate different request intercepter chains. + */ +public class TestFederationClientInterceptor extends BaseRouterClientRMTest { + private static final Logger LOG = + LoggerFactory.getLogger(TestFederationClientInterceptor.class); + + private TestableFederationClientInterceptor interceptor; + private MemoryFederationStateStore stateStore; + private FederationStateStoreTestUtil stateStoreUtil; + private List subClusters; + + private String user = "test-user"; + + private final static int NUM_SUBCLUSTER = 4; + + @Override + public void setUp() { + super.setUpConfig(); + interceptor = new TestableFederationClientInterceptor(); + + stateStore = new MemoryFederationStateStore(); + stateStore.init(this.getConf()); + FederationStateStoreFacade.getInstance().reinitialize(stateStore, + getConf()); + stateStoreUtil = new FederationStateStoreTestUtil(stateStore); + + interceptor.setConf(this.getConf()); + interceptor.init(user); + + subClusters = new ArrayList(); + + try { + for (int i = 0; i < NUM_SUBCLUSTER; i++) { + SubClusterId sc = SubClusterId.newInstance(Integer.toString(i)); + stateStoreUtil.registerSubCluster(sc); + subClusters.add(sc); + } + } catch (YarnException e) { + LOG.error(e.getMessage()); + Assert.fail(); + } + + } + + @Override + public void tearDown() { + interceptor.shutdown(); + super.tearDown(); + } + + @Override + protected YarnConfiguration createConfiguration() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + String mockPassThroughInterceptorClass = + PassThroughClientRequestInterceptor.class.getName(); + + // Create a request intercepter pipeline for testing. The last one in the + // chain is the federation intercepter that calls the mock resource manager. + // The others in the chain will simply forward it to the next one in the + // chain + conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE, + mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + + "," + TestableFederationClientInterceptor.class.getName()); + + conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER, + UniformBroadcastPolicyManager.class.getName()); + + // Disable StateStoreFacade cache + conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); + + return conf; + } + + /** + * This test validates the correctness of GetNewApplication. The return + * ApplicationId has to belong to one of the SubCluster in the cluster. + */ + @Test + public void testGetNewApplication() + throws YarnException, IOException, InterruptedException { + System.out.println("Test FederationClientInterceptor: Get New Application"); + + GetNewApplicationRequest request = GetNewApplicationRequest.newInstance(); + GetNewApplicationResponse response = interceptor.getNewApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(response.getApplicationId()); + Assert.assertTrue( + response.getApplicationId().getClusterTimestamp() < NUM_SUBCLUSTER); + Assert.assertTrue(response.getApplicationId().getClusterTimestamp() >= 0); + } + + /** + * This test validates the correctness of SubmitApplication. The application + * has to be submitted to one of the SubCluster in the cluster. + */ + @Test + public void testSubmitApplication() + throws YarnException, IOException, InterruptedException { + System.out.println("Test FederationClientInterceptor: Submit Application"); + + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContext context = ApplicationSubmissionContext + .newInstance(appId, "", "", null, null, false, false, -1, null, null); + SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(context); + + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId); + Assert.assertNotNull(scIdResult); + Assert.assertTrue(subClusters.contains(scIdResult)); + } + + /** + * This test validates the correctness of SubmitApplication in case of + * multiple submission. The first retry has to be submitted to the same + * SubCluster of the first attempt. + */ + @Test + public void testSubmitApplicationMultipleSubmission() + throws YarnException, IOException, InterruptedException { + System.out.println( + "Test FederationClientInterceptor: Submit Application - Multiple"); + + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContext context = ApplicationSubmissionContext + .newInstance(appId, "", "", null, null, false, false, -1, null, null); + SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(context); + + // First attempt + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId); + Assert.assertNotNull(scIdResult); + + // First retry + response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + SubClusterId scIdResult2 = stateStoreUtil.queryApplicationHomeSC(appId); + Assert.assertNotNull(scIdResult2); + Assert.assertEquals(scIdResult, scIdResult); + } + + /** + * This test validates the correctness of SubmitApplication in case of empty + * request. + */ + @Test + public void testSubmitApplicationEmptyRequest() + throws YarnException, IOException, InterruptedException { + System.out.println( + "Test FederationClientInterceptor: Submit Application - Empty"); + try { + interceptor.submitApplication(null); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().startsWith("Missing submitApplication request or " + + "applicationSubmissionContex information.")); + } + try { + interceptor.submitApplication(SubmitApplicationRequest.newInstance(null)); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().startsWith("Missing submitApplication request or " + + "applicationSubmissionContex information.")); + } + try { + ApplicationSubmissionContext context = ApplicationSubmissionContext + .newInstance(null, "", "", null, null, false, false, -1, null, null); + SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(context); + interceptor.submitApplication(request); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().startsWith("Missing submitApplication request or " + + "applicationSubmissionContex information.")); + } + } + + /** + * This test validates the correctness of ForceKillApplication in case the + * application exists in the cluster. + */ + @Test + public void testForceKillApplication() + throws YarnException, IOException, InterruptedException { + System.out + .println("Test FederationClientInterceptor: Force Kill Application"); + + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContext context = ApplicationSubmissionContext + .newInstance(appId, "", "", null, null, false, false, -1, null, null); + + SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(context); + // Submit the application we are going to kill later + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + KillApplicationRequest requestKill = + KillApplicationRequest.newInstance(appId); + KillApplicationResponse responseKill = + interceptor.forceKillApplication(requestKill); + Assert.assertNotNull(responseKill); + } + + /** + * This test validates the correctness of ForceKillApplication in case of + * application does not exist in StateStore. + */ + @Test + public void testForceKillApplicationNotExists() + throws YarnException, IOException, InterruptedException { + System.out.println("Test FederationClientInterceptor: " + + "Force Kill Application - Not Exists"); + + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + KillApplicationRequest requestKill = + KillApplicationRequest.newInstance(appId); + try { + interceptor.forceKillApplication(requestKill); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().equals( + "Application " + appId + " does not exist in FederationStateStore")); + } + } + + /** + * This test validates the correctness of ForceKillApplication in case of + * empty request. + */ + @Test + public void testForceKillApplicationEmptyRequest() + throws YarnException, IOException, InterruptedException { + System.out.println( + "Test FederationClientInterceptor: Force Kill Application - Empty"); + try { + interceptor.forceKillApplication(null); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().startsWith( + "Missing forceKillApplication request or ApplicationId.")); + } + try { + interceptor + .forceKillApplication(KillApplicationRequest.newInstance(null)); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().startsWith( + "Missing forceKillApplication request or ApplicationId.")); + } + } + + /** + * This test validates the correctness of GetApplicationReport in case the + * application exists in the cluster. + */ + @Test + public void testGetApplicationReport() + throws YarnException, IOException, InterruptedException { + System.out + .println("Test FederationClientInterceptor: Get Application Report"); + + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + ApplicationSubmissionContext context = ApplicationSubmissionContext + .newInstance(appId, "", "", null, null, false, false, -1, null, null); + + SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(context); + // Submit the application we want the report later + SubmitApplicationResponse response = interceptor.submitApplication(request); + + Assert.assertNotNull(response); + Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId)); + + GetApplicationReportRequest requestGet = + GetApplicationReportRequest.newInstance(appId); + + GetApplicationReportResponse responseGet = + interceptor.getApplicationReport(requestGet); + + Assert.assertNotNull(responseGet); + } + + /** + * This test validates the correctness of GetApplicationReport in case the + * application does not exist in StateStore. + */ + @Test + public void testGetApplicationNotExists() + throws YarnException, IOException, InterruptedException { + System.out.println( + "Test ApplicationClientProtocol: Get Application Report - Not Exists"); + ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + GetApplicationReportRequest requestGet = + GetApplicationReportRequest.newInstance(appId); + try { + interceptor.getApplicationReport(requestGet); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().equals( + "Application " + appId + " does not exist in FederationStateStore")); + } + } + + /** + * This test validates the correctness of GetApplicationReport in case of + * empty request. + */ + @Test + public void testGetApplicationEmptyRequest() + throws YarnException, IOException, InterruptedException { + System.out.println( + "Test FederationClientInterceptor: Get Application Report - Empty"); + try { + interceptor.getApplicationReport(null); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().startsWith("Missing getApplicationReport request or " + + "applicationId information.")); + } + try { + interceptor + .getApplicationReport(GetApplicationReportRequest.newInstance(null)); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue( + e.getMessage().startsWith("Missing getApplicationReport request or " + + "applicationId information.")); + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java new file mode 100644 index 00000000000..a655c163a67 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; +import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; +import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; +import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extends the {@code BaseRouterClientRMTest} and overrides methods in order to + * use the {@code RouterClientRMService} pipeline test cases for testing the + * {@code FederationInterceptor} class. The tests for + * {@code RouterClientRMService} has been written cleverly so that it can be + * reused to validate different request intercepter chains. + * + * It tests the case with SubClusters down and the Router logic of retries. We + * have 1 good SubCluster and 2 bad ones for all the tests. + */ +public class TestFederationClientInterceptorRetry + extends BaseRouterClientRMTest { + private static final Logger LOG = + LoggerFactory.getLogger(TestFederationClientInterceptorRetry.class); + + private TestableFederationClientInterceptor interceptor; + private MemoryFederationStateStore stateStore; + private FederationStateStoreTestUtil stateStoreUtil; + + private String user = "test-user"; + + // running and registered + private static SubClusterId good; + + // registered but not running + private static SubClusterId bad1; + private static SubClusterId bad2; + + private static List scs = new ArrayList(); + + @Override + public void setUp() { + super.setUpConfig(); + interceptor = new TestableFederationClientInterceptor(); + + stateStore = new MemoryFederationStateStore(); + stateStore.init(this.getConf()); + FederationStateStoreFacade.getInstance().reinitialize(stateStore, + getConf()); + stateStoreUtil = new FederationStateStoreTestUtil(stateStore); + + interceptor.setConf(this.getConf()); + interceptor.init(user); + + // Create SubClusters + good = SubClusterId.newInstance("0"); + bad1 = SubClusterId.newInstance("1"); + bad2 = SubClusterId.newInstance("2"); + scs.add(good); + scs.add(bad1); + scs.add(bad2); + + // The mock RM will not start in these SubClusters, this is done to simulate + // a SubCluster down + + interceptor.registerBadSubCluster(bad1); + interceptor.registerBadSubCluster(bad2); + } + + @Override + public void tearDown() { + interceptor.shutdown(); + super.tearDown(); + } + + private void setupCluster(List scsToRegister) + throws YarnException { + + try { + // Clean up the StateStore before every test + stateStoreUtil.deregisterAllSubClusters(); + + for (SubClusterId sc : scsToRegister) { + stateStoreUtil.registerSubCluster(sc); + } + } catch (YarnException e) { + LOG.error(e.getMessage()); + Assert.fail(); + } + } + + @Override + protected YarnConfiguration createConfiguration() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true); + String mockPassThroughInterceptorClass = + PassThroughClientRequestInterceptor.class.getName(); + + // Create a request intercepter pipeline for testing. The last one in the + // chain is the federation intercepter that calls the mock resource manager. + // The others in the chain will simply forward it to the next one in the + // chain + conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE, + mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass + + "," + TestableFederationClientInterceptor.class.getName()); + + conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER, + UniformBroadcastPolicyManager.class.getName()); + + // Disable StateStoreFacade cache + conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0); + + return conf; + } + + /** + * This test validates the correctness of GetNewApplication in case the + * cluster is composed of only 1 bad SubCluster. + */ + @Test + public void testGetNewApplicationOneBadSC() + throws YarnException, IOException, InterruptedException { + + System.out.println("Test getNewApplication with one bad SubCluster"); + setupCluster(Arrays.asList(bad2)); + + try { + interceptor.getNewApplication(GetNewApplicationRequest.newInstance()); + Assert.fail(); + } catch (Exception e) { + System.out.println(e.toString()); + Assert.assertTrue(e.getMessage() + .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE)); + } + } + + /** + * This test validates the correctness of GetNewApplication in case the + * cluster is composed of only 2 bad SubClusters. + */ + @Test + public void testGetNewApplicationTwoBadSCs() + throws YarnException, IOException, InterruptedException { + System.out.println("Test getNewApplication with two bad SubClusters"); + setupCluster(Arrays.asList(bad1, bad2)); + + try { + interceptor.getNewApplication(GetNewApplicationRequest.newInstance()); + Assert.fail(); + } catch (Exception e) { + System.out.println(e.toString()); + Assert.assertTrue(e.getMessage() + .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE)); + } + } + + /** + * This test validates the correctness of GetNewApplication in case the + * cluster is composed of only 1 bad SubCluster and 1 good one. + */ + @Test + public void testGetNewApplicationOneBadOneGood() + throws YarnException, IOException, InterruptedException { + System.out.println("Test getNewApplication with one bad, one good SC"); + setupCluster(Arrays.asList(good, bad2)); + GetNewApplicationResponse response = null; + try { + response = + interceptor.getNewApplication(GetNewApplicationRequest.newInstance()); + } catch (Exception e) { + Assert.fail(); + } + Assert.assertEquals(Integer.parseInt(good.getId()), + response.getApplicationId().getClusterTimestamp()); + } + + /** + * This test validates the correctness of SubmitApplication in case the + * cluster is composed of only 1 bad SubCluster. + */ + @Test + public void testSubmitApplicationOneBadSC() + throws YarnException, IOException, InterruptedException { + + System.out.println("Test submitApplication with one bad SubCluster"); + setupCluster(Arrays.asList(bad2)); + + final ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + + ApplicationSubmissionContext context = ApplicationSubmissionContext + .newInstance(appId, "", "", null, null, false, false, -1, null, null); + final SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(context); + try { + interceptor.submitApplication(request); + Assert.fail(); + } catch (Exception e) { + System.out.println(e.toString()); + Assert.assertTrue(e.getMessage() + .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE)); + } + } + + /** + * This test validates the correctness of SubmitApplication in case the + * cluster is composed of only 2 bad SubClusters. + */ + @Test + public void testSubmitApplicationTwoBadSCs() + throws YarnException, IOException, InterruptedException { + System.out.println("Test submitApplication with two bad SubClusters"); + setupCluster(Arrays.asList(bad1, bad2)); + + final ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + + ApplicationSubmissionContext context = ApplicationSubmissionContext + .newInstance(appId, "", "", null, null, false, false, -1, null, null); + final SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(context); + try { + interceptor.submitApplication(request); + Assert.fail(); + } catch (Exception e) { + System.out.println(e.toString()); + Assert.assertTrue(e.getMessage() + .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE)); + } + } + + /** + * This test validates the correctness of SubmitApplication in case the + * cluster is composed of only 1 bad SubCluster and a good one. + */ + @Test + public void testSubmitApplicationOneBadOneGood() + throws YarnException, IOException, InterruptedException { + System.out.println("Test submitApplication with one bad, one good SC"); + setupCluster(Arrays.asList(good, bad2)); + + final ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + + ApplicationSubmissionContext context = ApplicationSubmissionContext + .newInstance(appId, "", "", null, null, false, false, -1, null, null); + final SubmitApplicationRequest request = + SubmitApplicationRequest.newInstance(context); + try { + interceptor.submitApplication(request); + } catch (Exception e) { + Assert.fail(); + } + Assert.assertEquals(good, + stateStore + .getApplicationHomeSubCluster( + GetApplicationHomeSubClusterRequest.newInstance(appId)) + .getApplicationHomeSubCluster().getHomeSubCluster()); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java new file mode 100644 index 00000000000..e4a1a42bd64 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.router.clientrm; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.MockResourceManagerFacade; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; + +/** + * Extends the FederationClientInterceptor and overrides methods to provide a + * testable implementation of FederationClientInterceptor. + */ +public class TestableFederationClientInterceptor + extends FederationClientInterceptor { + + private ConcurrentHashMap mockRMs = + new ConcurrentHashMap<>(); + + private List badSubCluster = new ArrayList(); + + @Override + protected ApplicationClientProtocol getClientRMProxyForSubCluster( + SubClusterId subClusterId) throws YarnException { + + MockResourceManagerFacade mockRM = null; + synchronized (this) { + if (mockRMs.containsKey(subClusterId)) { + mockRM = mockRMs.get(subClusterId); + } else { + mockRM = new MockResourceManagerFacade(super.getConf(), 0, + Integer.parseInt(subClusterId.getId()), + !badSubCluster.contains(subClusterId)); + mockRMs.put(subClusterId, mockRM); + + } + return mockRM; + } + } + + /** + * For testing purpose, some subclusters has to be down to simulate particular + * scenarios as RM Failover, network issues. For this reason we keep track of + * these bad subclusters. This method make the subcluster unusable. + * + * @param badSC the subcluster to make unusable + */ + protected void registerBadSubCluster(SubClusterId badSC) { + badSubCluster.add(badSC); + if (mockRMs.contains(badSC)) { + mockRMs.get(badSC).setRunningMode(false); + } + } + +}