YARN-3659. Federation: routing client invocations transparently to multiple RMs. (Giovanni Matteo Fumarola via Subru).

This commit is contained in:
Subru Krishnan 2017-06-26 13:27:26 -07:00 committed by Carlo Curino
parent f8e5de5969
commit 52daa6d971
23 changed files with 1866 additions and 39 deletions

View File

@ -2674,6 +2674,14 @@ public class YarnConfiguration extends Configuration {
"org.apache.hadoop.yarn.server.router.rmadmin."
+ "DefaultRMAdminRequestInterceptor";
/**
* The number of retries for GetNewApplication and SubmitApplication in
* {@code FederationClientInterceptor}.
*/
public static final String ROUTER_CLIENTRM_SUBMIT_RETRY =
ROUTER_PREFIX + "submit.retry";
public static final int DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY = 3;
////////////////////////////////
// Other Configs
////////////////////////////////

View File

@ -136,6 +136,11 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED);
// Ignore all Router Federation variables
configurationPrefixToSkipCompare
.add(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY);
// Set by container-executor.cfg
configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);

View File

@ -24,7 +24,6 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

View File

@ -51,7 +51,7 @@ public final class FederationPolicyInitializationContextValidator {
if (policyContext.getFederationSubclusterResolver() == null) {
throw new FederationPolicyInitializationException(
"The FederationStateStoreFacase provided is null. Cannot"
"The FederationSubclusterResolver provided is null. Cannot"
+ " reinitalize successfully.");
}

View File

@ -19,12 +19,14 @@ package org.apache.hadoop.yarn.server.federation.policies;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@ -41,6 +43,9 @@ public final class FederationPolicyUtils {
private static final Logger LOG =
LoggerFactory.getLogger(FederationPolicyUtils.class);
public static final String NO_ACTIVE_SUBCLUSTER_AVAILABLE =
"No active SubCluster available to submit the request.";
/** Disable constructor. */
private FederationPolicyUtils() {
}
@ -165,4 +170,34 @@ public final class FederationPolicyUtils {
return federationPolicyManager.getAMRMPolicy(context, oldPolicy);
}
/**
* Validate if there is any active subcluster that is not blacklisted, it will
* throw an exception if there are no usable subclusters.
*
* @param activeSubClusters the list of subClusters as identified by
* {@link SubClusterId} currently active.
* @param blackListSubClusters the list of subClusters as identified by
* {@link SubClusterId} to blackList from the selection of the home
* subCluster.
* @throws FederationPolicyException if there are no usable subclusters.
*/
public static void validateSubClusterAvailability(
List<SubClusterId> activeSubClusters,
List<SubClusterId> blackListSubClusters)
throws FederationPolicyException {
if (activeSubClusters != null && !activeSubClusters.isEmpty()) {
if (blackListSubClusters == null) {
return;
}
for (SubClusterId scId : activeSubClusters) {
if (!blackListSubClusters.contains(scId)) {
// There is at least one active subcluster
return;
}
}
}
throw new FederationPolicyException(
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
}
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -56,7 +57,7 @@ public class RouterPolicyFacade {
@VisibleForTesting
Map<String, FederationRouterPolicy> globalPolicyMap;
public RouterPolicyFacade(YarnConfiguration conf,
public RouterPolicyFacade(Configuration conf,
FederationStateStoreFacade facade, SubClusterResolver resolver,
SubClusterId homeSubcluster)
throws FederationPolicyInitializationException {

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@ -76,6 +77,10 @@ public class HashBasedRouterPolicy extends AbstractRouterPolicy {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
FederationPolicyUtils.validateSubClusterAvailability(
new ArrayList<SubClusterId>(activeSubclusters.keySet()),
blackListSubClusters);
if (blackListSubClusters != null) {
// Remove from the active SubClusters from StateStore the blacklisted ones

View File

@ -17,12 +17,14 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@ -72,6 +74,9 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
FederationPolicyUtils.validateSubClusterAvailability(
new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
Map<SubClusterIdInfo, Float> weights =
getPolicyInfo().getRouterPolicyWeights();
SubClusterIdInfo chosen = null;

View File

@ -17,11 +17,13 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@ -44,6 +46,9 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
FederationPolicyUtils.validateSubClusterAvailability(
new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
// This finds the sub-cluster with the highest weight among the
// currently active ones.
Map<SubClusterIdInfo, Float> weights =

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@ -86,6 +87,9 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
FederationPolicyUtils.validateSubClusterAvailability(list,
blackListSubClusters);
if (blackListSubClusters != null) {
// Remove from the active SubClusters from StateStore the blacklisted ones

View File

@ -18,12 +18,14 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@ -51,6 +53,9 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
FederationPolicyUtils.validateSubClusterAvailability(
new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
// note: we cannot pre-compute the weights, as the set of activeSubcluster
// changes dynamically (and this would unfairly spread the load to
// sub-clusters adjacent to an inactive one), hence we need to count/scan

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server;
import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -154,6 +155,7 @@ import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
/**
@ -175,6 +177,13 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
new HashMap<ContainerId, Container>();
private AtomicInteger containerIndex = new AtomicInteger(0);
private Configuration conf;
private int subClusterId;
final private AtomicInteger applicationCounter = new AtomicInteger(0);
// True if the Mock RM is running, false otherwise.
// This property allows us to write tests for specific scenario as Yarn RM
// down e.g. network issue, failover.
private boolean isRunning;
private boolean shouldReRegisterNext = false;
@ -187,14 +196,25 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public MockResourceManagerFacade(Configuration conf,
int startContainerIndex) {
this(conf, startContainerIndex, 0, true);
}
public MockResourceManagerFacade(Configuration conf, int startContainerIndex,
int subClusterId, boolean isRunning) {
this.conf = conf;
this.containerIndex.set(startContainerIndex);
this.subClusterId = subClusterId;
this.isRunning = isRunning;
}
public void setShouldReRegisterNext() {
shouldReRegisterNext = true;
}
public void setRunningMode(boolean mode) {
this.isRunning = mode;
}
private static String getAppIdentifier() throws IOException {
AMRMTokenIdentifier result = null;
UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
@ -208,10 +228,19 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
return result != null ? result.getApplicationAttemptId().toString() : "";
}
private void validateRunning() throws ConnectException {
if (!isRunning) {
throw new ConnectException("RM is stopped");
}
}
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
validateRunning();
String amrmToken = getAppIdentifier();
LOG.info("Registering application attempt: " + amrmToken);
@ -248,6 +277,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request)
throws YarnException, IOException {
validateRunning();
String amrmToken = getAppIdentifier();
LOG.info("Finishing application attempt: " + amrmToken);
@ -284,6 +316,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
validateRunning();
if (request.getAskList() != null && request.getAskList().size() > 0
&& request.getReleaseList() != null
&& request.getReleaseList().size() > 0) {
@ -391,6 +426,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public GetApplicationReportResponse getApplicationReport(
GetApplicationReportRequest request) throws YarnException, IOException {
validateRunning();
GetApplicationReportResponse response =
Records.newRecord(GetApplicationReportResponse.class);
ApplicationReport report = Records.newRecord(ApplicationReport.class);
@ -407,6 +444,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
GetApplicationAttemptReportRequest request)
throws YarnException, IOException {
validateRunning();
GetApplicationAttemptReportResponse response =
Records.newRecord(GetApplicationAttemptReportResponse.class);
ApplicationAttemptReport report =
@ -420,12 +459,19 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
@Override
public GetNewApplicationResponse getNewApplication(
GetNewApplicationRequest request) throws YarnException, IOException {
return GetNewApplicationResponse.newInstance(null, null, null);
validateRunning();
return GetNewApplicationResponse.newInstance(ApplicationId.newInstance(
subClusterId, applicationCounter.incrementAndGet()), null, null);
}
@Override
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException, IOException {
validateRunning();
ApplicationId appId = null;
if (request.getApplicationSubmissionContext() != null) {
appId = request.getApplicationSubmissionContext().getApplicationId();
@ -438,6 +484,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
@Override
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request) throws YarnException, IOException {
validateRunning();
ApplicationId appId = null;
if (request.getApplicationId() != null) {
appId = request.getApplicationId();
@ -453,48 +502,72 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
@Override
public GetClusterMetricsResponse getClusterMetrics(
GetClusterMetricsRequest request) throws YarnException, IOException {
validateRunning();
return GetClusterMetricsResponse.newInstance(null);
}
@Override
public GetApplicationsResponse getApplications(GetApplicationsRequest request)
throws YarnException, IOException {
validateRunning();
return GetApplicationsResponse.newInstance(null);
}
@Override
public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
throws YarnException, IOException {
validateRunning();
return GetClusterNodesResponse.newInstance(null);
}
@Override
public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
throws YarnException, IOException {
validateRunning();
return GetQueueInfoResponse.newInstance(null);
}
@Override
public GetQueueUserAclsInfoResponse getQueueUserAcls(
GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
validateRunning();
return GetQueueUserAclsInfoResponse.newInstance(null);
}
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnException, IOException {
validateRunning();
return GetDelegationTokenResponse.newInstance(null);
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnException, IOException {
validateRunning();
return RenewDelegationTokenResponse.newInstance(0);
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnException, IOException {
validateRunning();
return CancelDelegationTokenResponse.newInstance();
}
@ -502,36 +575,54 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
MoveApplicationAcrossQueuesRequest request)
throws YarnException, IOException {
validateRunning();
return MoveApplicationAcrossQueuesResponse.newInstance();
}
@Override
public GetApplicationAttemptsResponse getApplicationAttempts(
GetApplicationAttemptsRequest request) throws YarnException, IOException {
validateRunning();
return GetApplicationAttemptsResponse.newInstance(null);
}
@Override
public GetContainerReportResponse getContainerReport(
GetContainerReportRequest request) throws YarnException, IOException {
validateRunning();
return GetContainerReportResponse.newInstance(null);
}
@Override
public GetContainersResponse getContainers(GetContainersRequest request)
throws YarnException, IOException {
validateRunning();
return GetContainersResponse.newInstance(null);
}
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {
validateRunning();
return ReservationSubmissionResponse.newInstance();
}
@Override
public ReservationListResponse listReservations(
ReservationListRequest request) throws YarnException, IOException {
validateRunning();
return ReservationListResponse
.newInstance(new ArrayList<ReservationAllocationState>());
}
@ -539,18 +630,27 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
@Override
public ReservationUpdateResponse updateReservation(
ReservationUpdateRequest request) throws YarnException, IOException {
validateRunning();
return ReservationUpdateResponse.newInstance();
}
@Override
public ReservationDeleteResponse deleteReservation(
ReservationDeleteRequest request) throws YarnException, IOException {
validateRunning();
return ReservationDeleteResponse.newInstance();
}
@Override
public GetNodesToLabelsResponse getNodeToLabels(
GetNodesToLabelsRequest request) throws YarnException, IOException {
validateRunning();
return GetNodesToLabelsResponse
.newInstance(new HashMap<NodeId, Set<String>>());
}
@ -558,18 +658,27 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
@Override
public GetClusterNodeLabelsResponse getClusterNodeLabels(
GetClusterNodeLabelsRequest request) throws YarnException, IOException {
validateRunning();
return GetClusterNodeLabelsResponse.newInstance(new ArrayList<NodeLabel>());
}
@Override
public GetLabelsToNodesResponse getLabelsToNodes(
GetLabelsToNodesRequest request) throws YarnException, IOException {
validateRunning();
return GetLabelsToNodesResponse.newInstance(null);
}
@Override
public GetNewReservationResponse getNewReservation(
GetNewReservationRequest request) throws YarnException, IOException {
validateRunning();
return GetNewReservationResponse
.newInstance(ReservationId.newInstance(0, 0));
}
@ -577,6 +686,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
@Override
public FailApplicationAttemptResponse failApplicationAttempt(
FailApplicationAttemptRequest request) throws YarnException, IOException {
validateRunning();
return FailApplicationAttemptResponse.newInstance();
}
@ -584,12 +696,18 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public UpdateApplicationPriorityResponse updateApplicationPriority(
UpdateApplicationPriorityRequest request)
throws YarnException, IOException {
validateRunning();
return UpdateApplicationPriorityResponse.newInstance(null);
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
validateRunning();
return new SignalContainerResponsePBImpl();
}
@ -597,18 +715,27 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
UpdateApplicationTimeoutsRequest request)
throws YarnException, IOException {
validateRunning();
return UpdateApplicationTimeoutsResponse.newInstance();
}
@Override
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws StandbyException, YarnException, IOException {
validateRunning();
return RefreshQueuesResponse.newInstance();
}
@Override
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws StandbyException, YarnException, IOException {
validateRunning();
return RefreshNodesResponse.newInstance();
}
@ -616,6 +743,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest request)
throws StandbyException, YarnException, IOException {
validateRunning();
return RefreshSuperUserGroupsConfigurationResponse.newInstance();
}
@ -623,36 +753,54 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest request)
throws StandbyException, YarnException, IOException {
validateRunning();
return RefreshUserToGroupsMappingsResponse.newInstance();
}
@Override
public RefreshAdminAclsResponse refreshAdminAcls(
RefreshAdminAclsRequest request) throws YarnException, IOException {
validateRunning();
return RefreshAdminAclsResponse.newInstance();
}
@Override
public RefreshServiceAclsResponse refreshServiceAcls(
RefreshServiceAclsRequest request) throws YarnException, IOException {
validateRunning();
return RefreshServiceAclsResponse.newInstance();
}
@Override
public UpdateNodeResourceResponse updateNodeResource(
UpdateNodeResourceRequest request) throws YarnException, IOException {
validateRunning();
return UpdateNodeResourceResponse.newInstance();
}
@Override
public RefreshNodesResourcesResponse refreshNodesResources(
RefreshNodesResourcesRequest request) throws YarnException, IOException {
validateRunning();
return RefreshNodesResourcesResponse.newInstance();
}
@Override
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
validateRunning();
return AddToClusterNodeLabelsResponse.newInstance();
}
@ -660,12 +808,18 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
RemoveFromClusterNodeLabelsRequest request)
throws YarnException, IOException {
validateRunning();
return RemoveFromClusterNodeLabelsResponse.newInstance();
}
@Override
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
validateRunning();
return ReplaceLabelsOnNodeResponse.newInstance();
}
@ -673,6 +827,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
throws YarnException, IOException {
validateRunning();
return CheckForDecommissioningNodesResponse.newInstance(null);
}
@ -680,11 +837,17 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
RefreshClusterMaxPriorityRequest request)
throws YarnException, IOException {
validateRunning();
return RefreshClusterMaxPriorityResponse.newInstance();
}
@Override
public String[] getGroupsForUser(String user) throws IOException {
validateRunning();
return new String[0];
}
}

View File

@ -26,6 +26,7 @@ import java.util.Random;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@ -87,4 +88,31 @@ public abstract class BaseRouterPoliciesTest
Assert.assertEquals(removed, chosen);
}
}
/**
* This test validates the correctness of blacklist logic in case the cluster
* has no active subclusters.
*/
@Test
public void testAllBlacklistSubcluster() throws YarnException {
FederationRouterPolicy localPolicy = (FederationRouterPolicy) getPolicy();
ApplicationSubmissionContext applicationSubmissionContext =
ApplicationSubmissionContext.newInstance(null, null, null, null, null,
false, false, 0, Resources.none(), null, false, null, null);
Map<SubClusterId, SubClusterInfo> activeSubClusters =
getActiveSubclusters();
if (activeSubClusters != null && activeSubClusters.size() > 1
&& !(localPolicy instanceof RejectRouterPolicy)) {
List<SubClusterId> blacklistSubclusters =
new ArrayList<SubClusterId>(activeSubClusters.keySet());
try {
localPolicy.getHomeSubcluster(applicationSubmissionContext,
blacklistSubclusters);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage()
.equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
}
}
}
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolic
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
@ -69,7 +70,7 @@ public class FederationStateStoreTestUtil {
SubClusterState.SC_RUNNING, CLOCK.getTime(), "capability");
}
private void registerSubCluster(SubClusterId subClusterId)
public void registerSubCluster(SubClusterId subClusterId)
throws YarnException {
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
@ -164,4 +165,16 @@ public class FederationStateStoreTestUtil {
return result.getPolicyConfiguration();
}
public void deregisterAllSubClusters() throws YarnException {
for (SubClusterId sc : getAllSubClusterIds(true)) {
deRegisterSubCluster(sc);
}
}
private void deRegisterSubCluster(SubClusterId subClusterId)
throws YarnException {
stateStore.deregisterSubCluster(SubClusterDeregisterRequest
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED));
}
}

View File

@ -55,6 +55,11 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Common utility methods used by the Router server.
*
*/
@Private
@Unstable
public final class RouterServerUtil {
/** Disable constructor. */
private RouterServerUtil() {
}
public static final Logger LOG =
LoggerFactory.getLogger(RouterServerUtil.class);
/**
* Throws an exception due to an error.
*
* @param errMsg the error message
* @param t the throwable raised in the called class.
* @throws YarnException on failure
*/
@Public
@Unstable
public static void logAndThrowException(String errMsg, Throwable t)
throws YarnException {
if (t != null) {
LOG.error(errMsg, t);
throw new YarnException(errMsg, t);
} else {
LOG.error(errMsg);
throw new YarnException(errMsg);
}
}
}

View File

@ -18,7 +18,13 @@
package org.apache.hadoop.yarn.server.router.clientrm;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements the {@link ClientRequestInterceptor} interface and provides common
@ -28,9 +34,16 @@ import org.apache.hadoop.conf.Configuration;
*/
public abstract class AbstractClientRequestInterceptor
implements ClientRequestInterceptor {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractClientRequestInterceptor.class);
private Configuration conf;
private ClientRequestInterceptor nextInterceptor;
@SuppressWarnings("checkstyle:visibilitymodifier")
protected UserGroupInformation user = null;
/**
* Sets the {@link ClientRequestInterceptor} in the chain.
*/
@ -63,9 +76,10 @@ public abstract class AbstractClientRequestInterceptor
* Initializes the {@link ClientRequestInterceptor}.
*/
@Override
public void init(String user) {
public void init(String userName) {
setupUser(userName);
if (this.nextInterceptor != null) {
this.nextInterceptor.init(user);
this.nextInterceptor.init(userName);
}
}
@ -87,4 +101,27 @@ public abstract class AbstractClientRequestInterceptor
return this.nextInterceptor;
}
private void setupUser(String userName) {
try {
// Do not create a proxy user if user name matches the user name on
// current UGI
if (userName.equalsIgnoreCase(
UserGroupInformation.getCurrentUser().getUserName())) {
user = UserGroupInformation.getCurrentUser();
} else {
user = UserGroupInformation.createProxyUser(userName,
UserGroupInformation.getCurrentUser());
}
} catch (IOException e) {
String message = "Error while creating Router ClientRM Service for user:";
if (user != null) {
message += ", user: " + user;
}
LOG.info(message);
throw new YarnRuntimeException(message, e);
}
}
}

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
@ -85,8 +84,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRespo
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
@ -98,27 +95,14 @@ import com.google.common.annotations.VisibleForTesting;
*/
public class DefaultClientRequestInterceptor
extends AbstractClientRequestInterceptor {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultClientRequestInterceptor.class);
private ApplicationClientProtocol clientRMProxy;
private UserGroupInformation user = null;
@Override
public void init(String userName) {
super.init(userName);
try {
// Do not create a proxy user if user name matches the user name on
// current UGI
if (userName.equalsIgnoreCase(
UserGroupInformation.getCurrentUser().getUserName())) {
user = UserGroupInformation.getCurrentUser();
} else {
user = UserGroupInformation.createProxyUser(userName,
UserGroupInformation.getCurrentUser());
}
final Configuration conf = this.getConf();
try {
clientRMProxy =
user.doAs(new PrivilegedExceptionAction<ApplicationClientProtocol>() {
@Override
@ -127,16 +111,9 @@ public class DefaultClientRequestInterceptor
ApplicationClientProtocol.class);
}
});
} catch (IOException e) {
String message = "Error while creating Router ClientRM Service for user:";
if (user != null) {
message += ", user: " + user;
}
LOG.info(message);
throw new YarnRuntimeException(message, e);
} catch (Exception e) {
throw new YarnRuntimeException(e);
throw new YarnRuntimeException(
"Unable to create the interface to reach the YarnRM", e);
}
}

View File

@ -0,0 +1,684 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.clientrm;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Extends the {@code AbstractRequestInterceptorClient} class and provides an
* implementation for federation of YARN RM and scaling an application across
* multiple YARN SubClusters. All the federation specific implementation is
* encapsulated in this class. This is always the last intercepter in the chain.
*/
public class FederationClientInterceptor
extends AbstractClientRequestInterceptor {
/*
* TODO YARN-6740 Federation Router (hiding multiple RMs for
* ApplicationClientProtocol) phase 2.
*
* The current implementation finalized the main 4 calls (getNewApplication,
* submitApplication, forceKillApplication and getApplicationReport). Those
* allow us to execute applications E2E.
*/
private static final Logger LOG =
LoggerFactory.getLogger(FederationClientInterceptor.class);
private int numSubmitRetries;
private Map<SubClusterId, ApplicationClientProtocol> clientRMProxies;
private FederationStateStoreFacade federationFacade;
private Random rand;
private RouterPolicyFacade policyFacade;
@Override
public void init(String userName) {
super.init(userName);
federationFacade = FederationStateStoreFacade.getInstance();
rand = new Random(System.currentTimeMillis());
final Configuration conf = this.getConf();
try {
policyFacade = new RouterPolicyFacade(conf, federationFacade,
this.federationFacade.getSubClusterResolver(), null);
} catch (FederationPolicyInitializationException e) {
LOG.error(e.getMessage());
}
numSubmitRetries =
conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
clientRMProxies =
new ConcurrentHashMap<SubClusterId, ApplicationClientProtocol>();
}
@Override
public void setNextInterceptor(ClientRequestInterceptor next) {
throw new YarnRuntimeException("setNextInterceptor is being called on "
+ "FederationClientRequestInterceptor, which should be the last one "
+ "in the chain. Check if the interceptor pipeline configuration "
+ "is correct");
}
@VisibleForTesting
protected ApplicationClientProtocol getClientRMProxyForSubCluster(
SubClusterId subClusterId) throws YarnException {
if (clientRMProxies.containsKey(subClusterId)) {
return clientRMProxies.get(subClusterId);
}
ApplicationClientProtocol clientRMProxy = null;
try {
clientRMProxy =
user.doAs(new PrivilegedExceptionAction<ApplicationClientProtocol>() {
@Override
public ApplicationClientProtocol run() throws Exception {
return ClientRMProxy.createRMProxy(getConf(),
ApplicationClientProtocol.class);
}
});
} catch (Exception e) {
RouterServerUtil.logAndThrowException(
"Unable to create the interface to reach the SubCluster "
+ subClusterId,
e);
}
clientRMProxies.put(subClusterId, clientRMProxy);
return clientRMProxy;
}
private SubClusterId getRandomActiveSubCluster(
Map<SubClusterId, SubClusterInfo> activeSubclusters)
throws YarnException {
if (activeSubclusters == null || activeSubclusters.size() < 1) {
RouterServerUtil.logAndThrowException(
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
}
List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
return list.get(rand.nextInt(list.size()));
}
/**
* Yarn Router forwards every getNewApplication requests to any RM. During
* this operation there will be no communication with the State Store. The
* Router will forward the requests to any SubCluster. The Router will retry
* to submit the request on #numSubmitRetries different SubClusters. The
* SubClusters are randomly chosen from the active ones.
*
* Possible failures and behaviors:
*
* Client: identical behavior as {@code ClientRMService}.
*
* Router: the Client will timeout and resubmit.
*
* ResourceManager: the Router will timeout and contacts another RM.
*
* StateStore: not in the execution.
*/
@Override
public GetNewApplicationResponse getNewApplication(
GetNewApplicationRequest request) throws YarnException, IOException {
Map<SubClusterId, SubClusterInfo> subClustersActive =
federationFacade.getSubClusters(true);
for (int i = 0; i < numSubmitRetries; ++i) {
SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
LOG.debug(
"getNewApplication try #" + i + " on SubCluster " + subClusterId);
ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
GetNewApplicationResponse response = null;
try {
response = clientRMProxy.getNewApplication(request);
} catch (Exception e) {
LOG.warn("Unable to create a new ApplicationId in SubCluster "
+ subClusterId.getId(), e);
}
if (response != null) {
return response;
} else {
// Empty response from the ResourceManager.
// Blacklist this subcluster for this request.
subClustersActive.remove(subClusterId);
}
}
String errMsg = "Fail to create a new application.";
LOG.error(errMsg);
throw new YarnException(errMsg);
}
/**
* Today, in YARN there are no checks of any applicationId submitted.
*
* Base scenarios:
*
* The Client submits an application to the Router. The Router selects one
* SubCluster to forward the request. The Router inserts a tuple into
* StateStore with the selected SubCluster (e.g. SC1) and the appId. The
* State Store replies with the selected SubCluster (e.g. SC1). The Router
* submits the request to the selected SubCluster.
*
* In case of State Store failure:
*
* The client submits an application to the Router. The Router selects one
* SubCluster to forward the request. The Router inserts a tuple into State
* Store with the selected SubCluster (e.g. SC1) and the appId. Due to the
* State Store down the Router times out and it will retry depending on the
* FederationFacade settings. The Router replies to the client with an error
* message.
*
* If State Store fails after inserting the tuple: identical behavior as
* {@code ClientRMService}.
*
* In case of Router failure:
*
* Scenario 1 Crash before submission to the ResourceManager
*
* The Client submits an application to the Router. The Router selects one
* SubCluster to forward the request. The Router inserts a tuple into State
* Store with the selected SubCluster (e.g. SC1) and the appId. The Router
* crashes. The Client timeouts and resubmits the application. The Router
* selects one SubCluster to forward the request. The Router inserts a tuple
* into State Store with the selected SubCluster (e.g. SC2) and the appId.
* Because the tuple is already inserted in the State Store, it returns the
* previous selected SubCluster (e.g. SC1). The Router submits the request
* to the selected SubCluster (e.g. SC1).
*
* Scenario 2 Crash after submission to the ResourceManager
*
* The Client submits an application to the Router. The Router selects one
* SubCluster to forward the request. The Router inserts a tuple into State
* Store with the selected SubCluster (e.g. SC1) and the appId. The Router
* submits the request to the selected SubCluster. The Router crashes. The
* Client timeouts and resubmit the application. The Router selects one
* SubCluster to forward the request. The Router inserts a tuple into State
* Store with the selected SubCluster (e.g. SC2) and the appId. The State
* Store replies with the selected SubCluster (e.g. SC1). The Router submits
* the request to the selected SubCluster (e.g. SC1). When a client re-submits
* the same application to the same RM, it does not raise an exception and
* replies with operation successful message.
*
* In case of Client failure: identical behavior as {@code ClientRMService}.
*
* In case of ResourceManager failure:
*
* The Client submits an application to the Router. The Router selects one
* SubCluster to forward the request. The Router inserts a tuple into State
* Store with the selected SubCluster (e.g. SC1) and the appId. The Router
* submits the request to the selected SubCluster. The entire SubCluster is
* down all the RMs in HA or the master RM is not reachable. The Router
* times out. The Router selects a new SubCluster to forward the request.
* The Router update a tuple into State Store with the selected SubCluster
* (e.g. SC2) and the appId. The State Store replies with OK answer. The
* Router submits the request to the selected SubCluster (e.g. SC2).
*/
@Override
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException, IOException {
if (request == null || request.getApplicationSubmissionContext() == null
|| request.getApplicationSubmissionContext()
.getApplicationId() == null) {
RouterServerUtil
.logAndThrowException("Missing submitApplication request or "
+ "applicationSubmissionContex information.", null);
}
ApplicationId applicationId =
request.getApplicationSubmissionContext().getApplicationId();
List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
for (int i = 0; i < numSubmitRetries; ++i) {
SubClusterId subClusterId = policyFacade.getHomeSubcluster(
request.getApplicationSubmissionContext(), blacklist);
LOG.info("submitApplication appId" + applicationId + " try #" + i
+ " on SubCluster " + subClusterId);
ApplicationHomeSubCluster appHomeSubCluster =
ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
if (i == 0) {
try {
// persist the mapping of applicationId and the subClusterId which has
// been selected as its home
subClusterId =
federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
} catch (YarnException e) {
String message = "Unable to insert the ApplicationId " + applicationId
+ " into the FederationStateStore";
RouterServerUtil.logAndThrowException(message, e);
}
} else {
try {
// update the mapping of applicationId and the home subClusterId to
// the new subClusterId we have selected
federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster);
} catch (YarnException e) {
String message = "Unable to update the ApplicationId " + applicationId
+ " into the FederationStateStore";
SubClusterId subClusterIdInStateStore =
federationFacade.getApplicationHomeSubCluster(applicationId);
if (subClusterId == subClusterIdInStateStore) {
LOG.info("Application " + applicationId
+ " already submitted on SubCluster " + subClusterId);
} else {
RouterServerUtil.logAndThrowException(message, e);
}
}
}
ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
SubmitApplicationResponse response = null;
try {
response = clientRMProxy.submitApplication(request);
} catch (Exception e) {
LOG.warn("Unable to submit the application " + applicationId
+ "to SubCluster " + subClusterId.getId(), e);
}
if (response != null) {
LOG.info("Application "
+ request.getApplicationSubmissionContext().getApplicationName()
+ " with appId " + applicationId + " submitted on " + subClusterId);
return response;
} else {
// Empty response from the ResourceManager.
// Blacklist this subcluster for this request.
blacklist.add(subClusterId);
}
}
String errMsg = "Application "
+ request.getApplicationSubmissionContext().getApplicationName()
+ " with appId " + applicationId + " failed to be submitted.";
LOG.error(errMsg);
throw new YarnException(errMsg);
}
/**
* The Yarn Router will forward to the respective Yarn RM in which the AM is
* running.
*
* Possible failures and behaviors:
*
* Client: identical behavior as {@code ClientRMService}.
*
* Router: the Client will timeout and resubmit the request.
*
* ResourceManager: the Router will timeout and the call will fail.
*
* State Store: the Router will timeout and it will retry depending on the
* FederationFacade settings - if the failure happened before the select
* operation.
*/
@Override
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request) throws YarnException, IOException {
if (request == null || request.getApplicationId() == null) {
RouterServerUtil.logAndThrowException(
"Missing forceKillApplication request or ApplicationId.", null);
}
ApplicationId applicationId = request.getApplicationId();
SubClusterId subClusterId = null;
try {
subClusterId = federationFacade
.getApplicationHomeSubCluster(request.getApplicationId());
} catch (YarnException e) {
RouterServerUtil.logAndThrowException("Application " + applicationId
+ " does not exist in FederationStateStore", e);
}
ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
KillApplicationResponse response = null;
try {
LOG.info("forceKillApplication " + applicationId + " on SubCluster "
+ subClusterId);
response = clientRMProxy.forceKillApplication(request);
} catch (Exception e) {
LOG.error("Unable to kill the application report for "
+ request.getApplicationId() + "to SubCluster "
+ subClusterId.getId(), e);
throw e;
}
if (response == null) {
LOG.error("No response when attempting to kill the application "
+ applicationId + " to SubCluster " + subClusterId.getId());
}
return response;
}
/**
* The Yarn Router will forward to the respective Yarn RM in which the AM is
* running.
*
* Possible failure:
*
* Client: identical behavior as {@code ClientRMService}.
*
* Router: the Client will timeout and resubmit the request.
*
* ResourceManager: the Router will timeout and the call will fail.
*
* State Store: the Router will timeout and it will retry depending on the
* FederationFacade settings - if the failure happened before the select
* operation.
*/
@Override
public GetApplicationReportResponse getApplicationReport(
GetApplicationReportRequest request) throws YarnException, IOException {
if (request == null || request.getApplicationId() == null) {
RouterServerUtil.logAndThrowException(
"Missing getApplicationReport request or applicationId information.",
null);
}
SubClusterId subClusterId = null;
try {
subClusterId = federationFacade
.getApplicationHomeSubCluster(request.getApplicationId());
} catch (YarnException e) {
RouterServerUtil
.logAndThrowException("Application " + request.getApplicationId()
+ " does not exist in FederationStateStore", e);
}
ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
GetApplicationReportResponse response = null;
try {
response = clientRMProxy.getApplicationReport(request);
} catch (Exception e) {
LOG.error("Unable to get the application report for "
+ request.getApplicationId() + "to SubCluster "
+ subClusterId.getId(), e);
throw e;
}
if (response == null) {
LOG.error("No response when attempting to retrieve the report of "
+ "the application " + request.getApplicationId() + " to SubCluster "
+ subClusterId.getId());
}
return response;
}
@Override
public GetApplicationsResponse getApplications(GetApplicationsRequest request)
throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public GetClusterMetricsResponse getClusterMetrics(
GetClusterMetricsRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public GetQueueUserAclsInfoResponse getQueueUserAcls(
GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
MoveApplicationAcrossQueuesRequest request)
throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public GetNewReservationResponse getNewReservation(
GetNewReservationRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public ReservationListResponse listReservations(
ReservationListRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public ReservationUpdateResponse updateReservation(
ReservationUpdateRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public ReservationDeleteResponse deleteReservation(
ReservationDeleteRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public GetNodesToLabelsResponse getNodeToLabels(
GetNodesToLabelsRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public GetLabelsToNodesResponse getLabelsToNodes(
GetLabelsToNodesRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public GetClusterNodeLabelsResponse getClusterNodeLabels(
GetClusterNodeLabelsRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
GetApplicationAttemptReportRequest request)
throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public GetApplicationAttemptsResponse getApplicationAttempts(
GetApplicationAttemptsRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public GetContainerReportResponse getContainerReport(
GetContainerReportRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public GetContainersResponse getContainers(GetContainersRequest request)
throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public FailApplicationAttemptResponse failApplicationAttempt(
FailApplicationAttemptRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public UpdateApplicationPriorityResponse updateApplicationPriority(
UpdateApplicationPriorityRequest request)
throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public SignalContainerResponse signalToContainer(
SignalContainerRequest request) throws YarnException, IOException {
throw new NotImplementedException();
}
@Override
public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
UpdateApplicationTimeoutsRequest request)
throws YarnException, IOException {
throw new NotImplementedException();
}
}

View File

@ -119,29 +119,41 @@ public abstract class BaseRouterClientRMTest {
return this.clientrmService;
}
@Before
public void setUp() {
this.conf = new YarnConfiguration();
protected YarnConfiguration createConfiguration() {
YarnConfiguration config = new YarnConfiguration();
String mockPassThroughInterceptorClass =
PassThroughClientRequestInterceptor.class.getName();
// Create a request intercepter pipeline for testing. The last one in the
// chain will call the mock resource manager. The others in the chain will
// simply forward it to the next one in the chain
this.conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
config.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ "," + mockPassThroughInterceptorClass + ","
+ MockClientRequestInterceptor.class.getName());
this.conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
config.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
TEST_MAX_CACHE_SIZE);
return config;
}
@Before
public void setUp() {
this.conf = createConfiguration();
this.dispatcher = new AsyncDispatcher();
this.dispatcher.init(conf);
this.dispatcher.start();
this.clientrmService = createAndStartRouterClientRMService();
}
public void setUpConfig() {
this.conf = createConfiguration();
}
protected Configuration getConf() {
return this.conf;
}
@After
public void tearDown() {
if (clientrmService != null) {

View File

@ -0,0 +1,403 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.clientrm;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Extends the {@code BaseRouterClientRMTest} and overrides methods in order to
* use the {@code RouterClientRMService} pipeline test cases for testing the
* {@code FederationInterceptor} class. The tests for
* {@code RouterClientRMService} has been written cleverly so that it can be
* reused to validate different request intercepter chains.
*/
public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestFederationClientInterceptor.class);
private TestableFederationClientInterceptor interceptor;
private MemoryFederationStateStore stateStore;
private FederationStateStoreTestUtil stateStoreUtil;
private List<SubClusterId> subClusters;
private String user = "test-user";
private final static int NUM_SUBCLUSTER = 4;
@Override
public void setUp() {
super.setUpConfig();
interceptor = new TestableFederationClientInterceptor();
stateStore = new MemoryFederationStateStore();
stateStore.init(this.getConf());
FederationStateStoreFacade.getInstance().reinitialize(stateStore,
getConf());
stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
interceptor.setConf(this.getConf());
interceptor.init(user);
subClusters = new ArrayList<SubClusterId>();
try {
for (int i = 0; i < NUM_SUBCLUSTER; i++) {
SubClusterId sc = SubClusterId.newInstance(Integer.toString(i));
stateStoreUtil.registerSubCluster(sc);
subClusters.add(sc);
}
} catch (YarnException e) {
LOG.error(e.getMessage());
Assert.fail();
}
}
@Override
public void tearDown() {
interceptor.shutdown();
super.tearDown();
}
@Override
protected YarnConfiguration createConfiguration() {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
String mockPassThroughInterceptorClass =
PassThroughClientRequestInterceptor.class.getName();
// Create a request intercepter pipeline for testing. The last one in the
// chain is the federation intercepter that calls the mock resource manager.
// The others in the chain will simply forward it to the next one in the
// chain
conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ "," + TestableFederationClientInterceptor.class.getName());
conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
UniformBroadcastPolicyManager.class.getName());
// Disable StateStoreFacade cache
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
return conf;
}
/**
* This test validates the correctness of GetNewApplication. The return
* ApplicationId has to belong to one of the SubCluster in the cluster.
*/
@Test
public void testGetNewApplication()
throws YarnException, IOException, InterruptedException {
System.out.println("Test FederationClientInterceptor: Get New Application");
GetNewApplicationRequest request = GetNewApplicationRequest.newInstance();
GetNewApplicationResponse response = interceptor.getNewApplication(request);
Assert.assertNotNull(response);
Assert.assertNotNull(response.getApplicationId());
Assert.assertTrue(
response.getApplicationId().getClusterTimestamp() < NUM_SUBCLUSTER);
Assert.assertTrue(response.getApplicationId().getClusterTimestamp() >= 0);
}
/**
* This test validates the correctness of SubmitApplication. The application
* has to be submitted to one of the SubCluster in the cluster.
*/
@Test
public void testSubmitApplication()
throws YarnException, IOException, InterruptedException {
System.out.println("Test FederationClientInterceptor: Submit Application");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContext context = ApplicationSubmissionContext
.newInstance(appId, "", "", null, null, false, false, -1, null, null);
SubmitApplicationRequest request =
SubmitApplicationRequest.newInstance(context);
SubmitApplicationResponse response = interceptor.submitApplication(request);
Assert.assertNotNull(response);
SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
Assert.assertNotNull(scIdResult);
Assert.assertTrue(subClusters.contains(scIdResult));
}
/**
* This test validates the correctness of SubmitApplication in case of
* multiple submission. The first retry has to be submitted to the same
* SubCluster of the first attempt.
*/
@Test
public void testSubmitApplicationMultipleSubmission()
throws YarnException, IOException, InterruptedException {
System.out.println(
"Test FederationClientInterceptor: Submit Application - Multiple");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContext context = ApplicationSubmissionContext
.newInstance(appId, "", "", null, null, false, false, -1, null, null);
SubmitApplicationRequest request =
SubmitApplicationRequest.newInstance(context);
// First attempt
SubmitApplicationResponse response = interceptor.submitApplication(request);
Assert.assertNotNull(response);
SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
Assert.assertNotNull(scIdResult);
// First retry
response = interceptor.submitApplication(request);
Assert.assertNotNull(response);
SubClusterId scIdResult2 = stateStoreUtil.queryApplicationHomeSC(appId);
Assert.assertNotNull(scIdResult2);
Assert.assertEquals(scIdResult, scIdResult);
}
/**
* This test validates the correctness of SubmitApplication in case of empty
* request.
*/
@Test
public void testSubmitApplicationEmptyRequest()
throws YarnException, IOException, InterruptedException {
System.out.println(
"Test FederationClientInterceptor: Submit Application - Empty");
try {
interceptor.submitApplication(null);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Missing submitApplication request or "
+ "applicationSubmissionContex information."));
}
try {
interceptor.submitApplication(SubmitApplicationRequest.newInstance(null));
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Missing submitApplication request or "
+ "applicationSubmissionContex information."));
}
try {
ApplicationSubmissionContext context = ApplicationSubmissionContext
.newInstance(null, "", "", null, null, false, false, -1, null, null);
SubmitApplicationRequest request =
SubmitApplicationRequest.newInstance(context);
interceptor.submitApplication(request);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Missing submitApplication request or "
+ "applicationSubmissionContex information."));
}
}
/**
* This test validates the correctness of ForceKillApplication in case the
* application exists in the cluster.
*/
@Test
public void testForceKillApplication()
throws YarnException, IOException, InterruptedException {
System.out
.println("Test FederationClientInterceptor: Force Kill Application");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContext context = ApplicationSubmissionContext
.newInstance(appId, "", "", null, null, false, false, -1, null, null);
SubmitApplicationRequest request =
SubmitApplicationRequest.newInstance(context);
// Submit the application we are going to kill later
SubmitApplicationResponse response = interceptor.submitApplication(request);
Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
KillApplicationRequest requestKill =
KillApplicationRequest.newInstance(appId);
KillApplicationResponse responseKill =
interceptor.forceKillApplication(requestKill);
Assert.assertNotNull(responseKill);
}
/**
* This test validates the correctness of ForceKillApplication in case of
* application does not exist in StateStore.
*/
@Test
public void testForceKillApplicationNotExists()
throws YarnException, IOException, InterruptedException {
System.out.println("Test FederationClientInterceptor: "
+ "Force Kill Application - Not Exists");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
KillApplicationRequest requestKill =
KillApplicationRequest.newInstance(appId);
try {
interceptor.forceKillApplication(requestKill);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().equals(
"Application " + appId + " does not exist in FederationStateStore"));
}
}
/**
* This test validates the correctness of ForceKillApplication in case of
* empty request.
*/
@Test
public void testForceKillApplicationEmptyRequest()
throws YarnException, IOException, InterruptedException {
System.out.println(
"Test FederationClientInterceptor: Force Kill Application - Empty");
try {
interceptor.forceKillApplication(null);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().startsWith(
"Missing forceKillApplication request or ApplicationId."));
}
try {
interceptor
.forceKillApplication(KillApplicationRequest.newInstance(null));
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().startsWith(
"Missing forceKillApplication request or ApplicationId."));
}
}
/**
* This test validates the correctness of GetApplicationReport in case the
* application exists in the cluster.
*/
@Test
public void testGetApplicationReport()
throws YarnException, IOException, InterruptedException {
System.out
.println("Test FederationClientInterceptor: Get Application Report");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContext context = ApplicationSubmissionContext
.newInstance(appId, "", "", null, null, false, false, -1, null, null);
SubmitApplicationRequest request =
SubmitApplicationRequest.newInstance(context);
// Submit the application we want the report later
SubmitApplicationResponse response = interceptor.submitApplication(request);
Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
GetApplicationReportRequest requestGet =
GetApplicationReportRequest.newInstance(appId);
GetApplicationReportResponse responseGet =
interceptor.getApplicationReport(requestGet);
Assert.assertNotNull(responseGet);
}
/**
* This test validates the correctness of GetApplicationReport in case the
* application does not exist in StateStore.
*/
@Test
public void testGetApplicationNotExists()
throws YarnException, IOException, InterruptedException {
System.out.println(
"Test ApplicationClientProtocol: Get Application Report - Not Exists");
ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
GetApplicationReportRequest requestGet =
GetApplicationReportRequest.newInstance(appId);
try {
interceptor.getApplicationReport(requestGet);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(e.getMessage().equals(
"Application " + appId + " does not exist in FederationStateStore"));
}
}
/**
* This test validates the correctness of GetApplicationReport in case of
* empty request.
*/
@Test
public void testGetApplicationEmptyRequest()
throws YarnException, IOException, InterruptedException {
System.out.println(
"Test FederationClientInterceptor: Get Application Report - Empty");
try {
interceptor.getApplicationReport(null);
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Missing getApplicationReport request or "
+ "applicationId information."));
}
try {
interceptor
.getApplicationReport(GetApplicationReportRequest.newInstance(null));
Assert.fail();
} catch (YarnException e) {
Assert.assertTrue(
e.getMessage().startsWith("Missing getApplicationReport request or "
+ "applicationId information."));
}
}
}

View File

@ -0,0 +1,295 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.clientrm;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Extends the {@code BaseRouterClientRMTest} and overrides methods in order to
* use the {@code RouterClientRMService} pipeline test cases for testing the
* {@code FederationInterceptor} class. The tests for
* {@code RouterClientRMService} has been written cleverly so that it can be
* reused to validate different request intercepter chains.
*
* It tests the case with SubClusters down and the Router logic of retries. We
* have 1 good SubCluster and 2 bad ones for all the tests.
*/
public class TestFederationClientInterceptorRetry
extends BaseRouterClientRMTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestFederationClientInterceptorRetry.class);
private TestableFederationClientInterceptor interceptor;
private MemoryFederationStateStore stateStore;
private FederationStateStoreTestUtil stateStoreUtil;
private String user = "test-user";
// running and registered
private static SubClusterId good;
// registered but not running
private static SubClusterId bad1;
private static SubClusterId bad2;
private static List<SubClusterId> scs = new ArrayList<SubClusterId>();
@Override
public void setUp() {
super.setUpConfig();
interceptor = new TestableFederationClientInterceptor();
stateStore = new MemoryFederationStateStore();
stateStore.init(this.getConf());
FederationStateStoreFacade.getInstance().reinitialize(stateStore,
getConf());
stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
interceptor.setConf(this.getConf());
interceptor.init(user);
// Create SubClusters
good = SubClusterId.newInstance("0");
bad1 = SubClusterId.newInstance("1");
bad2 = SubClusterId.newInstance("2");
scs.add(good);
scs.add(bad1);
scs.add(bad2);
// The mock RM will not start in these SubClusters, this is done to simulate
// a SubCluster down
interceptor.registerBadSubCluster(bad1);
interceptor.registerBadSubCluster(bad2);
}
@Override
public void tearDown() {
interceptor.shutdown();
super.tearDown();
}
private void setupCluster(List<SubClusterId> scsToRegister)
throws YarnException {
try {
// Clean up the StateStore before every test
stateStoreUtil.deregisterAllSubClusters();
for (SubClusterId sc : scsToRegister) {
stateStoreUtil.registerSubCluster(sc);
}
} catch (YarnException e) {
LOG.error(e.getMessage());
Assert.fail();
}
}
@Override
protected YarnConfiguration createConfiguration() {
YarnConfiguration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
String mockPassThroughInterceptorClass =
PassThroughClientRequestInterceptor.class.getName();
// Create a request intercepter pipeline for testing. The last one in the
// chain is the federation intercepter that calls the mock resource manager.
// The others in the chain will simply forward it to the next one in the
// chain
conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ "," + TestableFederationClientInterceptor.class.getName());
conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
UniformBroadcastPolicyManager.class.getName());
// Disable StateStoreFacade cache
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
return conf;
}
/**
* This test validates the correctness of GetNewApplication in case the
* cluster is composed of only 1 bad SubCluster.
*/
@Test
public void testGetNewApplicationOneBadSC()
throws YarnException, IOException, InterruptedException {
System.out.println("Test getNewApplication with one bad SubCluster");
setupCluster(Arrays.asList(bad2));
try {
interceptor.getNewApplication(GetNewApplicationRequest.newInstance());
Assert.fail();
} catch (Exception e) {
System.out.println(e.toString());
Assert.assertTrue(e.getMessage()
.equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
}
}
/**
* This test validates the correctness of GetNewApplication in case the
* cluster is composed of only 2 bad SubClusters.
*/
@Test
public void testGetNewApplicationTwoBadSCs()
throws YarnException, IOException, InterruptedException {
System.out.println("Test getNewApplication with two bad SubClusters");
setupCluster(Arrays.asList(bad1, bad2));
try {
interceptor.getNewApplication(GetNewApplicationRequest.newInstance());
Assert.fail();
} catch (Exception e) {
System.out.println(e.toString());
Assert.assertTrue(e.getMessage()
.equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
}
}
/**
* This test validates the correctness of GetNewApplication in case the
* cluster is composed of only 1 bad SubCluster and 1 good one.
*/
@Test
public void testGetNewApplicationOneBadOneGood()
throws YarnException, IOException, InterruptedException {
System.out.println("Test getNewApplication with one bad, one good SC");
setupCluster(Arrays.asList(good, bad2));
GetNewApplicationResponse response = null;
try {
response =
interceptor.getNewApplication(GetNewApplicationRequest.newInstance());
} catch (Exception e) {
Assert.fail();
}
Assert.assertEquals(Integer.parseInt(good.getId()),
response.getApplicationId().getClusterTimestamp());
}
/**
* This test validates the correctness of SubmitApplication in case the
* cluster is composed of only 1 bad SubCluster.
*/
@Test
public void testSubmitApplicationOneBadSC()
throws YarnException, IOException, InterruptedException {
System.out.println("Test submitApplication with one bad SubCluster");
setupCluster(Arrays.asList(bad2));
final ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContext context = ApplicationSubmissionContext
.newInstance(appId, "", "", null, null, false, false, -1, null, null);
final SubmitApplicationRequest request =
SubmitApplicationRequest.newInstance(context);
try {
interceptor.submitApplication(request);
Assert.fail();
} catch (Exception e) {
System.out.println(e.toString());
Assert.assertTrue(e.getMessage()
.equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
}
}
/**
* This test validates the correctness of SubmitApplication in case the
* cluster is composed of only 2 bad SubClusters.
*/
@Test
public void testSubmitApplicationTwoBadSCs()
throws YarnException, IOException, InterruptedException {
System.out.println("Test submitApplication with two bad SubClusters");
setupCluster(Arrays.asList(bad1, bad2));
final ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContext context = ApplicationSubmissionContext
.newInstance(appId, "", "", null, null, false, false, -1, null, null);
final SubmitApplicationRequest request =
SubmitApplicationRequest.newInstance(context);
try {
interceptor.submitApplication(request);
Assert.fail();
} catch (Exception e) {
System.out.println(e.toString());
Assert.assertTrue(e.getMessage()
.equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
}
}
/**
* This test validates the correctness of SubmitApplication in case the
* cluster is composed of only 1 bad SubCluster and a good one.
*/
@Test
public void testSubmitApplicationOneBadOneGood()
throws YarnException, IOException, InterruptedException {
System.out.println("Test submitApplication with one bad, one good SC");
setupCluster(Arrays.asList(good, bad2));
final ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
ApplicationSubmissionContext context = ApplicationSubmissionContext
.newInstance(appId, "", "", null, null, false, false, -1, null, null);
final SubmitApplicationRequest request =
SubmitApplicationRequest.newInstance(context);
try {
interceptor.submitApplication(request);
} catch (Exception e) {
Assert.fail();
}
Assert.assertEquals(good,
stateStore
.getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest.newInstance(appId))
.getApplicationHomeSubCluster().getHomeSubCluster());
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.router.clientrm;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
/**
* Extends the FederationClientInterceptor and overrides methods to provide a
* testable implementation of FederationClientInterceptor.
*/
public class TestableFederationClientInterceptor
extends FederationClientInterceptor {
private ConcurrentHashMap<SubClusterId, MockResourceManagerFacade> mockRMs =
new ConcurrentHashMap<>();
private List<SubClusterId> badSubCluster = new ArrayList<SubClusterId>();
@Override
protected ApplicationClientProtocol getClientRMProxyForSubCluster(
SubClusterId subClusterId) throws YarnException {
MockResourceManagerFacade mockRM = null;
synchronized (this) {
if (mockRMs.containsKey(subClusterId)) {
mockRM = mockRMs.get(subClusterId);
} else {
mockRM = new MockResourceManagerFacade(super.getConf(), 0,
Integer.parseInt(subClusterId.getId()),
!badSubCluster.contains(subClusterId));
mockRMs.put(subClusterId, mockRM);
}
return mockRM;
}
}
/**
* For testing purpose, some subclusters has to be down to simulate particular
* scenarios as RM Failover, network issues. For this reason we keep track of
* these bad subclusters. This method make the subcluster unusable.
*
* @param badSC the subcluster to make unusable
*/
protected void registerBadSubCluster(SubClusterId badSC) {
badSubCluster.add(badSC);
if (mockRMs.contains(badSC)) {
mockRMs.get(badSC).setRunningMode(false);
}
}
}