YARN-3659. Federation: routing client invocations transparently to multiple RMs. (Giovanni Matteo Fumarola via Subru).
(cherry picked from commit 52daa6d971
)
This commit is contained in:
parent
4cfec943b1
commit
43a97174fe
|
@ -2674,6 +2674,14 @@ public class YarnConfiguration extends Configuration {
|
|||
"org.apache.hadoop.yarn.server.router.rmadmin."
|
||||
+ "DefaultRMAdminRequestInterceptor";
|
||||
|
||||
/**
|
||||
* The number of retries for GetNewApplication and SubmitApplication in
|
||||
* {@code FederationClientInterceptor}.
|
||||
*/
|
||||
public static final String ROUTER_CLIENTRM_SUBMIT_RETRY =
|
||||
ROUTER_PREFIX + "submit.retry";
|
||||
public static final int DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY = 3;
|
||||
|
||||
////////////////////////////////
|
||||
// Other Configs
|
||||
////////////////////////////////
|
||||
|
|
|
@ -136,6 +136,11 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
|
|||
configurationPrefixToSkipCompare
|
||||
.add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED);
|
||||
|
||||
// Ignore all Router Federation variables
|
||||
|
||||
configurationPrefixToSkipCompare
|
||||
.add(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY);
|
||||
|
||||
// Set by container-executor.cfg
|
||||
configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);
|
||||
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
|
|
@ -51,7 +51,7 @@ public final class FederationPolicyInitializationContextValidator {
|
|||
|
||||
if (policyContext.getFederationSubclusterResolver() == null) {
|
||||
throw new FederationPolicyInitializationException(
|
||||
"The FederationStateStoreFacase provided is null. Cannot"
|
||||
"The FederationSubclusterResolver provided is null. Cannot"
|
||||
+ " reinitalize successfully.");
|
||||
}
|
||||
|
||||
|
|
|
@ -19,12 +19,14 @@ package org.apache.hadoop.yarn.server.federation.policies;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
|
@ -41,6 +43,9 @@ public final class FederationPolicyUtils {
|
|||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(FederationPolicyUtils.class);
|
||||
|
||||
public static final String NO_ACTIVE_SUBCLUSTER_AVAILABLE =
|
||||
"No active SubCluster available to submit the request.";
|
||||
|
||||
/** Disable constructor. */
|
||||
private FederationPolicyUtils() {
|
||||
}
|
||||
|
@ -165,4 +170,34 @@ public final class FederationPolicyUtils {
|
|||
return federationPolicyManager.getAMRMPolicy(context, oldPolicy);
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate if there is any active subcluster that is not blacklisted, it will
|
||||
* throw an exception if there are no usable subclusters.
|
||||
*
|
||||
* @param activeSubClusters the list of subClusters as identified by
|
||||
* {@link SubClusterId} currently active.
|
||||
* @param blackListSubClusters the list of subClusters as identified by
|
||||
* {@link SubClusterId} to blackList from the selection of the home
|
||||
* subCluster.
|
||||
* @throws FederationPolicyException if there are no usable subclusters.
|
||||
*/
|
||||
public static void validateSubClusterAvailability(
|
||||
List<SubClusterId> activeSubClusters,
|
||||
List<SubClusterId> blackListSubClusters)
|
||||
throws FederationPolicyException {
|
||||
if (activeSubClusters != null && !activeSubClusters.isEmpty()) {
|
||||
if (blackListSubClusters == null) {
|
||||
return;
|
||||
}
|
||||
for (SubClusterId scId : activeSubClusters) {
|
||||
if (!blackListSubClusters.contains(scId)) {
|
||||
// There is at least one active subcluster
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
throw new FederationPolicyException(
|
||||
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
|
||||
}
|
||||
|
||||
}
|
|
@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
@ -56,7 +57,7 @@ public class RouterPolicyFacade {
|
|||
@VisibleForTesting
|
||||
Map<String, FederationRouterPolicy> globalPolicyMap;
|
||||
|
||||
public RouterPolicyFacade(YarnConfiguration conf,
|
||||
public RouterPolicyFacade(Configuration conf,
|
||||
FederationStateStoreFacade facade, SubClusterResolver resolver,
|
||||
SubClusterId homeSubcluster)
|
||||
throws FederationPolicyInitializationException {
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
|
@ -76,6 +77,10 @@ public class HashBasedRouterPolicy extends AbstractRouterPolicy {
|
|||
Map<SubClusterId, SubClusterInfo> activeSubclusters =
|
||||
getActiveSubclusters();
|
||||
|
||||
FederationPolicyUtils.validateSubClusterAvailability(
|
||||
new ArrayList<SubClusterId>(activeSubclusters.keySet()),
|
||||
blackListSubClusters);
|
||||
|
||||
if (blackListSubClusters != null) {
|
||||
|
||||
// Remove from the active SubClusters from StateStore the blacklisted ones
|
||||
|
|
|
@ -17,12 +17,14 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.federation.policies.router;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
|
@ -72,6 +74,9 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
|
|||
Map<SubClusterId, SubClusterInfo> activeSubclusters =
|
||||
getActiveSubclusters();
|
||||
|
||||
FederationPolicyUtils.validateSubClusterAvailability(
|
||||
new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
|
||||
|
||||
Map<SubClusterIdInfo, Float> weights =
|
||||
getPolicyInfo().getRouterPolicyWeights();
|
||||
SubClusterIdInfo chosen = null;
|
||||
|
|
|
@ -17,11 +17,13 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.federation.policies.router;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
|
@ -44,6 +46,9 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy {
|
|||
Map<SubClusterId, SubClusterInfo> activeSubclusters =
|
||||
getActiveSubclusters();
|
||||
|
||||
FederationPolicyUtils.validateSubClusterAvailability(
|
||||
new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
|
||||
|
||||
// This finds the sub-cluster with the highest weight among the
|
||||
// currently active ones.
|
||||
Map<SubClusterIdInfo, Float> weights =
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
|
@ -86,6 +87,9 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
|
|||
|
||||
List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
|
||||
|
||||
FederationPolicyUtils.validateSubClusterAvailability(list,
|
||||
blackListSubClusters);
|
||||
|
||||
if (blackListSubClusters != null) {
|
||||
|
||||
// Remove from the active SubClusters from StateStore the blacklisted ones
|
||||
|
|
|
@ -18,12 +18,14 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.federation.policies.router;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
|
@ -51,6 +53,9 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
|
|||
Map<SubClusterId, SubClusterInfo> activeSubclusters =
|
||||
getActiveSubclusters();
|
||||
|
||||
FederationPolicyUtils.validateSubClusterAvailability(
|
||||
new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
|
||||
|
||||
// note: we cannot pre-compute the weights, as the set of activeSubcluster
|
||||
// changes dynamically (and this would unfairly spread the load to
|
||||
// sub-clusters adjacent to an inactive one), hence we need to count/scan
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.yarn.server;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -154,6 +155,7 @@ import org.apache.hadoop.yarn.util.Records;
|
|||
import org.junit.Assert;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
/**
|
||||
|
@ -175,6 +177,13 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
new HashMap<ContainerId, Container>();
|
||||
private AtomicInteger containerIndex = new AtomicInteger(0);
|
||||
private Configuration conf;
|
||||
private int subClusterId;
|
||||
final private AtomicInteger applicationCounter = new AtomicInteger(0);
|
||||
|
||||
// True if the Mock RM is running, false otherwise.
|
||||
// This property allows us to write tests for specific scenario as Yarn RM
|
||||
// down e.g. network issue, failover.
|
||||
private boolean isRunning;
|
||||
|
||||
private boolean shouldReRegisterNext = false;
|
||||
|
||||
|
@ -187,14 +196,25 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
|
||||
public MockResourceManagerFacade(Configuration conf,
|
||||
int startContainerIndex) {
|
||||
this(conf, startContainerIndex, 0, true);
|
||||
}
|
||||
|
||||
public MockResourceManagerFacade(Configuration conf, int startContainerIndex,
|
||||
int subClusterId, boolean isRunning) {
|
||||
this.conf = conf;
|
||||
this.containerIndex.set(startContainerIndex);
|
||||
this.subClusterId = subClusterId;
|
||||
this.isRunning = isRunning;
|
||||
}
|
||||
|
||||
public void setShouldReRegisterNext() {
|
||||
shouldReRegisterNext = true;
|
||||
}
|
||||
|
||||
public void setRunningMode(boolean mode) {
|
||||
this.isRunning = mode;
|
||||
}
|
||||
|
||||
private static String getAppIdentifier() throws IOException {
|
||||
AMRMTokenIdentifier result = null;
|
||||
UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
|
||||
|
@ -208,10 +228,19 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
return result != null ? result.getApplicationAttemptId().toString() : "";
|
||||
}
|
||||
|
||||
private void validateRunning() throws ConnectException {
|
||||
if (!isRunning) {
|
||||
throw new ConnectException("RM is stopped");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegisterApplicationMasterResponse registerApplicationMaster(
|
||||
RegisterApplicationMasterRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
String amrmToken = getAppIdentifier();
|
||||
LOG.info("Registering application attempt: " + amrmToken);
|
||||
|
||||
|
@ -248,6 +277,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
public FinishApplicationMasterResponse finishApplicationMaster(
|
||||
FinishApplicationMasterRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
String amrmToken = getAppIdentifier();
|
||||
LOG.info("Finishing application attempt: " + amrmToken);
|
||||
|
||||
|
@ -284,6 +316,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
@Override
|
||||
public AllocateResponse allocate(AllocateRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
if (request.getAskList() != null && request.getAskList().size() > 0
|
||||
&& request.getReleaseList() != null
|
||||
&& request.getReleaseList().size() > 0) {
|
||||
|
@ -391,6 +426,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
public GetApplicationReportResponse getApplicationReport(
|
||||
GetApplicationReportRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
GetApplicationReportResponse response =
|
||||
Records.newRecord(GetApplicationReportResponse.class);
|
||||
ApplicationReport report = Records.newRecord(ApplicationReport.class);
|
||||
|
@ -407,6 +444,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
GetApplicationAttemptReportRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
GetApplicationAttemptReportResponse response =
|
||||
Records.newRecord(GetApplicationAttemptReportResponse.class);
|
||||
ApplicationAttemptReport report =
|
||||
|
@ -420,12 +459,19 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
@Override
|
||||
public GetNewApplicationResponse getNewApplication(
|
||||
GetNewApplicationRequest request) throws YarnException, IOException {
|
||||
return GetNewApplicationResponse.newInstance(null, null, null);
|
||||
|
||||
validateRunning();
|
||||
|
||||
return GetNewApplicationResponse.newInstance(ApplicationId.newInstance(
|
||||
subClusterId, applicationCounter.incrementAndGet()), null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SubmitApplicationResponse submitApplication(
|
||||
SubmitApplicationRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
ApplicationId appId = null;
|
||||
if (request.getApplicationSubmissionContext() != null) {
|
||||
appId = request.getApplicationSubmissionContext().getApplicationId();
|
||||
|
@ -438,6 +484,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
@Override
|
||||
public KillApplicationResponse forceKillApplication(
|
||||
KillApplicationRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
ApplicationId appId = null;
|
||||
if (request.getApplicationId() != null) {
|
||||
appId = request.getApplicationId();
|
||||
|
@ -453,48 +502,72 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
@Override
|
||||
public GetClusterMetricsResponse getClusterMetrics(
|
||||
GetClusterMetricsRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return GetClusterMetricsResponse.newInstance(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationsResponse getApplications(GetApplicationsRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return GetApplicationsResponse.newInstance(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return GetClusterNodesResponse.newInstance(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return GetQueueInfoResponse.newInstance(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetQueueUserAclsInfoResponse getQueueUserAcls(
|
||||
GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return GetQueueUserAclsInfoResponse.newInstance(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetDelegationTokenResponse getDelegationToken(
|
||||
GetDelegationTokenRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return GetDelegationTokenResponse.newInstance(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RenewDelegationTokenResponse renewDelegationToken(
|
||||
RenewDelegationTokenRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return RenewDelegationTokenResponse.newInstance(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CancelDelegationTokenResponse cancelDelegationToken(
|
||||
CancelDelegationTokenRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return CancelDelegationTokenResponse.newInstance();
|
||||
}
|
||||
|
||||
|
@ -502,36 +575,54 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
|
||||
MoveApplicationAcrossQueuesRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return MoveApplicationAcrossQueuesResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationAttemptsResponse getApplicationAttempts(
|
||||
GetApplicationAttemptsRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return GetApplicationAttemptsResponse.newInstance(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetContainerReportResponse getContainerReport(
|
||||
GetContainerReportRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return GetContainerReportResponse.newInstance(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetContainersResponse getContainers(GetContainersRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return GetContainersResponse.newInstance(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReservationSubmissionResponse submitReservation(
|
||||
ReservationSubmissionRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return ReservationSubmissionResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReservationListResponse listReservations(
|
||||
ReservationListRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return ReservationListResponse
|
||||
.newInstance(new ArrayList<ReservationAllocationState>());
|
||||
}
|
||||
|
@ -539,18 +630,27 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
@Override
|
||||
public ReservationUpdateResponse updateReservation(
|
||||
ReservationUpdateRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return ReservationUpdateResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReservationDeleteResponse deleteReservation(
|
||||
ReservationDeleteRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return ReservationDeleteResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetNodesToLabelsResponse getNodeToLabels(
|
||||
GetNodesToLabelsRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return GetNodesToLabelsResponse
|
||||
.newInstance(new HashMap<NodeId, Set<String>>());
|
||||
}
|
||||
|
@ -558,18 +658,27 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
@Override
|
||||
public GetClusterNodeLabelsResponse getClusterNodeLabels(
|
||||
GetClusterNodeLabelsRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return GetClusterNodeLabelsResponse.newInstance(new ArrayList<NodeLabel>());
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetLabelsToNodesResponse getLabelsToNodes(
|
||||
GetLabelsToNodesRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return GetLabelsToNodesResponse.newInstance(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetNewReservationResponse getNewReservation(
|
||||
GetNewReservationRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return GetNewReservationResponse
|
||||
.newInstance(ReservationId.newInstance(0, 0));
|
||||
}
|
||||
|
@ -577,6 +686,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
@Override
|
||||
public FailApplicationAttemptResponse failApplicationAttempt(
|
||||
FailApplicationAttemptRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return FailApplicationAttemptResponse.newInstance();
|
||||
}
|
||||
|
||||
|
@ -584,12 +696,18 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
public UpdateApplicationPriorityResponse updateApplicationPriority(
|
||||
UpdateApplicationPriorityRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return UpdateApplicationPriorityResponse.newInstance(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SignalContainerResponse signalToContainer(
|
||||
SignalContainerRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return new SignalContainerResponsePBImpl();
|
||||
}
|
||||
|
||||
|
@ -597,18 +715,27 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
|
||||
UpdateApplicationTimeoutsRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return UpdateApplicationTimeoutsResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return RefreshQueuesResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return RefreshNodesResponse.newInstance();
|
||||
}
|
||||
|
||||
|
@ -616,6 +743,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
|
||||
RefreshSuperUserGroupsConfigurationRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return RefreshSuperUserGroupsConfigurationResponse.newInstance();
|
||||
}
|
||||
|
||||
|
@ -623,36 +753,54 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
|
||||
RefreshUserToGroupsMappingsRequest request)
|
||||
throws StandbyException, YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return RefreshUserToGroupsMappingsResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshAdminAclsResponse refreshAdminAcls(
|
||||
RefreshAdminAclsRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return RefreshAdminAclsResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshServiceAclsResponse refreshServiceAcls(
|
||||
RefreshServiceAclsRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return RefreshServiceAclsResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateNodeResourceResponse updateNodeResource(
|
||||
UpdateNodeResourceRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return UpdateNodeResourceResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RefreshNodesResourcesResponse refreshNodesResources(
|
||||
RefreshNodesResourcesRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return RefreshNodesResourcesResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
|
||||
AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return AddToClusterNodeLabelsResponse.newInstance();
|
||||
}
|
||||
|
||||
|
@ -660,12 +808,18 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
|
||||
RemoveFromClusterNodeLabelsRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return RemoveFromClusterNodeLabelsResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
|
||||
ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return ReplaceLabelsOnNodeResponse.newInstance();
|
||||
}
|
||||
|
||||
|
@ -673,6 +827,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
|
||||
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
|
||||
throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return CheckForDecommissioningNodesResponse.newInstance(null);
|
||||
}
|
||||
|
||||
|
@ -680,11 +837,17 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
|
|||
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
|
||||
RefreshClusterMaxPriorityRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return RefreshClusterMaxPriorityResponse.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getGroupsForUser(String user) throws IOException {
|
||||
|
||||
validateRunning();
|
||||
|
||||
return new String[0];
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.Random;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
|
||||
|
@ -87,4 +88,31 @@ public abstract class BaseRouterPoliciesTest
|
|||
Assert.assertEquals(removed, chosen);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of blacklist logic in case the cluster
|
||||
* has no active subclusters.
|
||||
*/
|
||||
@Test
|
||||
public void testAllBlacklistSubcluster() throws YarnException {
|
||||
FederationRouterPolicy localPolicy = (FederationRouterPolicy) getPolicy();
|
||||
ApplicationSubmissionContext applicationSubmissionContext =
|
||||
ApplicationSubmissionContext.newInstance(null, null, null, null, null,
|
||||
false, false, 0, Resources.none(), null, false, null, null);
|
||||
Map<SubClusterId, SubClusterInfo> activeSubClusters =
|
||||
getActiveSubclusters();
|
||||
if (activeSubClusters != null && activeSubClusters.size() > 1
|
||||
&& !(localPolicy instanceof RejectRouterPolicy)) {
|
||||
List<SubClusterId> blacklistSubclusters =
|
||||
new ArrayList<SubClusterId>(activeSubClusters.keySet());
|
||||
try {
|
||||
localPolicy.getHomeSubcluster(applicationSubmissionContext,
|
||||
blacklistSubclusters);
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(e.getMessage()
|
||||
.equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolic
|
|||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
|
||||
|
@ -69,7 +70,7 @@ public class FederationStateStoreTestUtil {
|
|||
SubClusterState.SC_RUNNING, CLOCK.getTime(), "capability");
|
||||
}
|
||||
|
||||
private void registerSubCluster(SubClusterId subClusterId)
|
||||
public void registerSubCluster(SubClusterId subClusterId)
|
||||
throws YarnException {
|
||||
|
||||
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
|
||||
|
@ -164,4 +165,16 @@ public class FederationStateStoreTestUtil {
|
|||
return result.getPolicyConfiguration();
|
||||
}
|
||||
|
||||
public void deregisterAllSubClusters() throws YarnException {
|
||||
for (SubClusterId sc : getAllSubClusterIds(true)) {
|
||||
deRegisterSubCluster(sc);
|
||||
}
|
||||
}
|
||||
|
||||
private void deRegisterSubCluster(SubClusterId subClusterId)
|
||||
throws YarnException {
|
||||
stateStore.deregisterSubCluster(SubClusterDeregisterRequest
|
||||
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -55,6 +55,11 @@
|
|||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-common</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-common</artifactId>
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.router;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Common utility methods used by the Router server.
|
||||
*
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public final class RouterServerUtil {
|
||||
|
||||
/** Disable constructor. */
|
||||
private RouterServerUtil() {
|
||||
}
|
||||
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(RouterServerUtil.class);
|
||||
|
||||
/**
|
||||
* Throws an exception due to an error.
|
||||
*
|
||||
* @param errMsg the error message
|
||||
* @param t the throwable raised in the called class.
|
||||
* @throws YarnException on failure
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public static void logAndThrowException(String errMsg, Throwable t)
|
||||
throws YarnException {
|
||||
if (t != null) {
|
||||
LOG.error(errMsg, t);
|
||||
throw new YarnException(errMsg, t);
|
||||
} else {
|
||||
LOG.error(errMsg);
|
||||
throw new YarnException(errMsg);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -18,7 +18,13 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.router.clientrm;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Implements the {@link ClientRequestInterceptor} interface and provides common
|
||||
|
@ -28,9 +34,16 @@ import org.apache.hadoop.conf.Configuration;
|
|||
*/
|
||||
public abstract class AbstractClientRequestInterceptor
|
||||
implements ClientRequestInterceptor {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AbstractClientRequestInterceptor.class);
|
||||
|
||||
private Configuration conf;
|
||||
private ClientRequestInterceptor nextInterceptor;
|
||||
|
||||
@SuppressWarnings("checkstyle:visibilitymodifier")
|
||||
protected UserGroupInformation user = null;
|
||||
|
||||
/**
|
||||
* Sets the {@link ClientRequestInterceptor} in the chain.
|
||||
*/
|
||||
|
@ -63,9 +76,10 @@ public abstract class AbstractClientRequestInterceptor
|
|||
* Initializes the {@link ClientRequestInterceptor}.
|
||||
*/
|
||||
@Override
|
||||
public void init(String user) {
|
||||
public void init(String userName) {
|
||||
setupUser(userName);
|
||||
if (this.nextInterceptor != null) {
|
||||
this.nextInterceptor.init(user);
|
||||
this.nextInterceptor.init(userName);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -87,4 +101,27 @@ public abstract class AbstractClientRequestInterceptor
|
|||
return this.nextInterceptor;
|
||||
}
|
||||
|
||||
private void setupUser(String userName) {
|
||||
|
||||
try {
|
||||
// Do not create a proxy user if user name matches the user name on
|
||||
// current UGI
|
||||
if (userName.equalsIgnoreCase(
|
||||
UserGroupInformation.getCurrentUser().getUserName())) {
|
||||
user = UserGroupInformation.getCurrentUser();
|
||||
} else {
|
||||
user = UserGroupInformation.createProxyUser(userName,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
String message = "Error while creating Router ClientRM Service for user:";
|
||||
if (user != null) {
|
||||
message += ", user: " + user;
|
||||
}
|
||||
|
||||
LOG.info(message);
|
||||
throw new YarnRuntimeException(message, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ import java.io.IOException;
|
|||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
||||
|
@ -85,8 +84,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRespo
|
|||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
|
@ -98,27 +95,14 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
*/
|
||||
public class DefaultClientRequestInterceptor
|
||||
extends AbstractClientRequestInterceptor {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(DefaultClientRequestInterceptor.class);
|
||||
private ApplicationClientProtocol clientRMProxy;
|
||||
private UserGroupInformation user = null;
|
||||
|
||||
@Override
|
||||
public void init(String userName) {
|
||||
super.init(userName);
|
||||
|
||||
final Configuration conf = this.getConf();
|
||||
try {
|
||||
// Do not create a proxy user if user name matches the user name on
|
||||
// current UGI
|
||||
if (userName.equalsIgnoreCase(
|
||||
UserGroupInformation.getCurrentUser().getUserName())) {
|
||||
user = UserGroupInformation.getCurrentUser();
|
||||
} else {
|
||||
user = UserGroupInformation.createProxyUser(userName,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
}
|
||||
|
||||
final Configuration conf = this.getConf();
|
||||
|
||||
clientRMProxy =
|
||||
user.doAs(new PrivilegedExceptionAction<ApplicationClientProtocol>() {
|
||||
@Override
|
||||
|
@ -127,16 +111,9 @@ public class DefaultClientRequestInterceptor
|
|||
ApplicationClientProtocol.class);
|
||||
}
|
||||
});
|
||||
} catch (IOException e) {
|
||||
String message = "Error while creating Router ClientRM Service for user:";
|
||||
if (user != null) {
|
||||
message += ", user: " + user;
|
||||
}
|
||||
|
||||
LOG.info(message);
|
||||
throw new YarnRuntimeException(message, e);
|
||||
} catch (Exception e) {
|
||||
throw new YarnRuntimeException(e);
|
||||
throw new YarnRuntimeException(
|
||||
"Unable to create the interface to reach the YarnRM", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,684 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.router.clientrm;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.commons.lang.NotImplementedException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Extends the {@code AbstractRequestInterceptorClient} class and provides an
|
||||
* implementation for federation of YARN RM and scaling an application across
|
||||
* multiple YARN SubClusters. All the federation specific implementation is
|
||||
* encapsulated in this class. This is always the last intercepter in the chain.
|
||||
*/
|
||||
public class FederationClientInterceptor
|
||||
extends AbstractClientRequestInterceptor {
|
||||
|
||||
/*
|
||||
* TODO YARN-6740 Federation Router (hiding multiple RMs for
|
||||
* ApplicationClientProtocol) phase 2.
|
||||
*
|
||||
* The current implementation finalized the main 4 calls (getNewApplication,
|
||||
* submitApplication, forceKillApplication and getApplicationReport). Those
|
||||
* allow us to execute applications E2E.
|
||||
*/
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(FederationClientInterceptor.class);
|
||||
|
||||
private int numSubmitRetries;
|
||||
private Map<SubClusterId, ApplicationClientProtocol> clientRMProxies;
|
||||
private FederationStateStoreFacade federationFacade;
|
||||
private Random rand;
|
||||
private RouterPolicyFacade policyFacade;
|
||||
|
||||
@Override
|
||||
public void init(String userName) {
|
||||
super.init(userName);
|
||||
|
||||
federationFacade = FederationStateStoreFacade.getInstance();
|
||||
rand = new Random(System.currentTimeMillis());
|
||||
|
||||
final Configuration conf = this.getConf();
|
||||
|
||||
try {
|
||||
policyFacade = new RouterPolicyFacade(conf, federationFacade,
|
||||
this.federationFacade.getSubClusterResolver(), null);
|
||||
} catch (FederationPolicyInitializationException e) {
|
||||
LOG.error(e.getMessage());
|
||||
}
|
||||
|
||||
numSubmitRetries =
|
||||
conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
|
||||
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
|
||||
|
||||
clientRMProxies =
|
||||
new ConcurrentHashMap<SubClusterId, ApplicationClientProtocol>();
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNextInterceptor(ClientRequestInterceptor next) {
|
||||
throw new YarnRuntimeException("setNextInterceptor is being called on "
|
||||
+ "FederationClientRequestInterceptor, which should be the last one "
|
||||
+ "in the chain. Check if the interceptor pipeline configuration "
|
||||
+ "is correct");
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected ApplicationClientProtocol getClientRMProxyForSubCluster(
|
||||
SubClusterId subClusterId) throws YarnException {
|
||||
|
||||
if (clientRMProxies.containsKey(subClusterId)) {
|
||||
return clientRMProxies.get(subClusterId);
|
||||
}
|
||||
|
||||
ApplicationClientProtocol clientRMProxy = null;
|
||||
try {
|
||||
clientRMProxy =
|
||||
user.doAs(new PrivilegedExceptionAction<ApplicationClientProtocol>() {
|
||||
@Override
|
||||
public ApplicationClientProtocol run() throws Exception {
|
||||
return ClientRMProxy.createRMProxy(getConf(),
|
||||
ApplicationClientProtocol.class);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
RouterServerUtil.logAndThrowException(
|
||||
"Unable to create the interface to reach the SubCluster "
|
||||
+ subClusterId,
|
||||
e);
|
||||
}
|
||||
|
||||
clientRMProxies.put(subClusterId, clientRMProxy);
|
||||
return clientRMProxy;
|
||||
}
|
||||
|
||||
private SubClusterId getRandomActiveSubCluster(
|
||||
Map<SubClusterId, SubClusterInfo> activeSubclusters)
|
||||
throws YarnException {
|
||||
|
||||
if (activeSubclusters == null || activeSubclusters.size() < 1) {
|
||||
RouterServerUtil.logAndThrowException(
|
||||
FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
|
||||
}
|
||||
List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
|
||||
|
||||
return list.get(rand.nextInt(list.size()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Yarn Router forwards every getNewApplication requests to any RM. During
|
||||
* this operation there will be no communication with the State Store. The
|
||||
* Router will forward the requests to any SubCluster. The Router will retry
|
||||
* to submit the request on #numSubmitRetries different SubClusters. The
|
||||
* SubClusters are randomly chosen from the active ones.
|
||||
*
|
||||
* Possible failures and behaviors:
|
||||
*
|
||||
* Client: identical behavior as {@code ClientRMService}.
|
||||
*
|
||||
* Router: the Client will timeout and resubmit.
|
||||
*
|
||||
* ResourceManager: the Router will timeout and contacts another RM.
|
||||
*
|
||||
* StateStore: not in the execution.
|
||||
*/
|
||||
@Override
|
||||
public GetNewApplicationResponse getNewApplication(
|
||||
GetNewApplicationRequest request) throws YarnException, IOException {
|
||||
Map<SubClusterId, SubClusterInfo> subClustersActive =
|
||||
federationFacade.getSubClusters(true);
|
||||
|
||||
for (int i = 0; i < numSubmitRetries; ++i) {
|
||||
SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
|
||||
LOG.debug(
|
||||
"getNewApplication try #" + i + " on SubCluster " + subClusterId);
|
||||
ApplicationClientProtocol clientRMProxy =
|
||||
getClientRMProxyForSubCluster(subClusterId);
|
||||
GetNewApplicationResponse response = null;
|
||||
try {
|
||||
response = clientRMProxy.getNewApplication(request);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Unable to create a new ApplicationId in SubCluster "
|
||||
+ subClusterId.getId(), e);
|
||||
}
|
||||
|
||||
if (response != null) {
|
||||
return response;
|
||||
} else {
|
||||
// Empty response from the ResourceManager.
|
||||
// Blacklist this subcluster for this request.
|
||||
subClustersActive.remove(subClusterId);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
String errMsg = "Fail to create a new application.";
|
||||
LOG.error(errMsg);
|
||||
throw new YarnException(errMsg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Today, in YARN there are no checks of any applicationId submitted.
|
||||
*
|
||||
* Base scenarios:
|
||||
*
|
||||
* The Client submits an application to the Router. • The Router selects one
|
||||
* SubCluster to forward the request. • The Router inserts a tuple into
|
||||
* StateStore with the selected SubCluster (e.g. SC1) and the appId. • The
|
||||
* State Store replies with the selected SubCluster (e.g. SC1). • The Router
|
||||
* submits the request to the selected SubCluster.
|
||||
*
|
||||
* In case of State Store failure:
|
||||
*
|
||||
* The client submits an application to the Router. • The Router selects one
|
||||
* SubCluster to forward the request. • The Router inserts a tuple into State
|
||||
* Store with the selected SubCluster (e.g. SC1) and the appId. • Due to the
|
||||
* State Store down the Router times out and it will retry depending on the
|
||||
* FederationFacade settings. • The Router replies to the client with an error
|
||||
* message.
|
||||
*
|
||||
* If State Store fails after inserting the tuple: identical behavior as
|
||||
* {@code ClientRMService}.
|
||||
*
|
||||
* In case of Router failure:
|
||||
*
|
||||
* Scenario 1 – Crash before submission to the ResourceManager
|
||||
*
|
||||
* The Client submits an application to the Router. • The Router selects one
|
||||
* SubCluster to forward the request. • The Router inserts a tuple into State
|
||||
* Store with the selected SubCluster (e.g. SC1) and the appId. • The Router
|
||||
* crashes. • The Client timeouts and resubmits the application. • The Router
|
||||
* selects one SubCluster to forward the request. • The Router inserts a tuple
|
||||
* into State Store with the selected SubCluster (e.g. SC2) and the appId. •
|
||||
* Because the tuple is already inserted in the State Store, it returns the
|
||||
* previous selected SubCluster (e.g. SC1). • The Router submits the request
|
||||
* to the selected SubCluster (e.g. SC1).
|
||||
*
|
||||
* Scenario 2 – Crash after submission to the ResourceManager
|
||||
*
|
||||
* • The Client submits an application to the Router. • The Router selects one
|
||||
* SubCluster to forward the request. • The Router inserts a tuple into State
|
||||
* Store with the selected SubCluster (e.g. SC1) and the appId. • The Router
|
||||
* submits the request to the selected SubCluster. • The Router crashes. • The
|
||||
* Client timeouts and resubmit the application. • The Router selects one
|
||||
* SubCluster to forward the request. • The Router inserts a tuple into State
|
||||
* Store with the selected SubCluster (e.g. SC2) and the appId. • The State
|
||||
* Store replies with the selected SubCluster (e.g. SC1). • The Router submits
|
||||
* the request to the selected SubCluster (e.g. SC1). When a client re-submits
|
||||
* the same application to the same RM, it does not raise an exception and
|
||||
* replies with operation successful message.
|
||||
*
|
||||
* In case of Client failure: identical behavior as {@code ClientRMService}.
|
||||
*
|
||||
* In case of ResourceManager failure:
|
||||
*
|
||||
* The Client submits an application to the Router. • The Router selects one
|
||||
* SubCluster to forward the request. • The Router inserts a tuple into State
|
||||
* Store with the selected SubCluster (e.g. SC1) and the appId. • The Router
|
||||
* submits the request to the selected SubCluster. • The entire SubCluster is
|
||||
* down – all the RMs in HA or the master RM is not reachable. • The Router
|
||||
* times out. • The Router selects a new SubCluster to forward the request. •
|
||||
* The Router update a tuple into State Store with the selected SubCluster
|
||||
* (e.g. SC2) and the appId. • The State Store replies with OK answer. • The
|
||||
* Router submits the request to the selected SubCluster (e.g. SC2).
|
||||
*/
|
||||
@Override
|
||||
public SubmitApplicationResponse submitApplication(
|
||||
SubmitApplicationRequest request) throws YarnException, IOException {
|
||||
if (request == null || request.getApplicationSubmissionContext() == null
|
||||
|| request.getApplicationSubmissionContext()
|
||||
.getApplicationId() == null) {
|
||||
RouterServerUtil
|
||||
.logAndThrowException("Missing submitApplication request or "
|
||||
+ "applicationSubmissionContex information.", null);
|
||||
}
|
||||
|
||||
ApplicationId applicationId =
|
||||
request.getApplicationSubmissionContext().getApplicationId();
|
||||
|
||||
List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
|
||||
|
||||
for (int i = 0; i < numSubmitRetries; ++i) {
|
||||
|
||||
SubClusterId subClusterId = policyFacade.getHomeSubcluster(
|
||||
request.getApplicationSubmissionContext(), blacklist);
|
||||
LOG.info("submitApplication appId" + applicationId + " try #" + i
|
||||
+ " on SubCluster " + subClusterId);
|
||||
|
||||
ApplicationHomeSubCluster appHomeSubCluster =
|
||||
ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
|
||||
|
||||
if (i == 0) {
|
||||
try {
|
||||
// persist the mapping of applicationId and the subClusterId which has
|
||||
// been selected as its home
|
||||
subClusterId =
|
||||
federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
|
||||
} catch (YarnException e) {
|
||||
String message = "Unable to insert the ApplicationId " + applicationId
|
||||
+ " into the FederationStateStore";
|
||||
RouterServerUtil.logAndThrowException(message, e);
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
// update the mapping of applicationId and the home subClusterId to
|
||||
// the new subClusterId we have selected
|
||||
federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster);
|
||||
} catch (YarnException e) {
|
||||
String message = "Unable to update the ApplicationId " + applicationId
|
||||
+ " into the FederationStateStore";
|
||||
SubClusterId subClusterIdInStateStore =
|
||||
federationFacade.getApplicationHomeSubCluster(applicationId);
|
||||
if (subClusterId == subClusterIdInStateStore) {
|
||||
LOG.info("Application " + applicationId
|
||||
+ " already submitted on SubCluster " + subClusterId);
|
||||
} else {
|
||||
RouterServerUtil.logAndThrowException(message, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ApplicationClientProtocol clientRMProxy =
|
||||
getClientRMProxyForSubCluster(subClusterId);
|
||||
|
||||
SubmitApplicationResponse response = null;
|
||||
try {
|
||||
response = clientRMProxy.submitApplication(request);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Unable to submit the application " + applicationId
|
||||
+ "to SubCluster " + subClusterId.getId(), e);
|
||||
}
|
||||
|
||||
if (response != null) {
|
||||
LOG.info("Application "
|
||||
+ request.getApplicationSubmissionContext().getApplicationName()
|
||||
+ " with appId " + applicationId + " submitted on " + subClusterId);
|
||||
return response;
|
||||
} else {
|
||||
// Empty response from the ResourceManager.
|
||||
// Blacklist this subcluster for this request.
|
||||
blacklist.add(subClusterId);
|
||||
}
|
||||
}
|
||||
|
||||
String errMsg = "Application "
|
||||
+ request.getApplicationSubmissionContext().getApplicationName()
|
||||
+ " with appId " + applicationId + " failed to be submitted.";
|
||||
LOG.error(errMsg);
|
||||
throw new YarnException(errMsg);
|
||||
}
|
||||
|
||||
/**
|
||||
* The Yarn Router will forward to the respective Yarn RM in which the AM is
|
||||
* running.
|
||||
*
|
||||
* Possible failures and behaviors:
|
||||
*
|
||||
* Client: identical behavior as {@code ClientRMService}.
|
||||
*
|
||||
* Router: the Client will timeout and resubmit the request.
|
||||
*
|
||||
* ResourceManager: the Router will timeout and the call will fail.
|
||||
*
|
||||
* State Store: the Router will timeout and it will retry depending on the
|
||||
* FederationFacade settings - if the failure happened before the select
|
||||
* operation.
|
||||
*/
|
||||
@Override
|
||||
public KillApplicationResponse forceKillApplication(
|
||||
KillApplicationRequest request) throws YarnException, IOException {
|
||||
|
||||
if (request == null || request.getApplicationId() == null) {
|
||||
RouterServerUtil.logAndThrowException(
|
||||
"Missing forceKillApplication request or ApplicationId.", null);
|
||||
}
|
||||
ApplicationId applicationId = request.getApplicationId();
|
||||
SubClusterId subClusterId = null;
|
||||
|
||||
try {
|
||||
subClusterId = federationFacade
|
||||
.getApplicationHomeSubCluster(request.getApplicationId());
|
||||
} catch (YarnException e) {
|
||||
RouterServerUtil.logAndThrowException("Application " + applicationId
|
||||
+ " does not exist in FederationStateStore", e);
|
||||
}
|
||||
|
||||
ApplicationClientProtocol clientRMProxy =
|
||||
getClientRMProxyForSubCluster(subClusterId);
|
||||
|
||||
KillApplicationResponse response = null;
|
||||
try {
|
||||
LOG.info("forceKillApplication " + applicationId + " on SubCluster "
|
||||
+ subClusterId);
|
||||
response = clientRMProxy.forceKillApplication(request);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Unable to kill the application report for "
|
||||
+ request.getApplicationId() + "to SubCluster "
|
||||
+ subClusterId.getId(), e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
if (response == null) {
|
||||
LOG.error("No response when attempting to kill the application "
|
||||
+ applicationId + " to SubCluster " + subClusterId.getId());
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* The Yarn Router will forward to the respective Yarn RM in which the AM is
|
||||
* running.
|
||||
*
|
||||
* Possible failure:
|
||||
*
|
||||
* Client: identical behavior as {@code ClientRMService}.
|
||||
*
|
||||
* Router: the Client will timeout and resubmit the request.
|
||||
*
|
||||
* ResourceManager: the Router will timeout and the call will fail.
|
||||
*
|
||||
* State Store: the Router will timeout and it will retry depending on the
|
||||
* FederationFacade settings - if the failure happened before the select
|
||||
* operation.
|
||||
*/
|
||||
@Override
|
||||
public GetApplicationReportResponse getApplicationReport(
|
||||
GetApplicationReportRequest request) throws YarnException, IOException {
|
||||
|
||||
if (request == null || request.getApplicationId() == null) {
|
||||
RouterServerUtil.logAndThrowException(
|
||||
"Missing getApplicationReport request or applicationId information.",
|
||||
null);
|
||||
}
|
||||
|
||||
SubClusterId subClusterId = null;
|
||||
|
||||
try {
|
||||
subClusterId = federationFacade
|
||||
.getApplicationHomeSubCluster(request.getApplicationId());
|
||||
} catch (YarnException e) {
|
||||
RouterServerUtil
|
||||
.logAndThrowException("Application " + request.getApplicationId()
|
||||
+ " does not exist in FederationStateStore", e);
|
||||
}
|
||||
|
||||
ApplicationClientProtocol clientRMProxy =
|
||||
getClientRMProxyForSubCluster(subClusterId);
|
||||
|
||||
GetApplicationReportResponse response = null;
|
||||
try {
|
||||
response = clientRMProxy.getApplicationReport(request);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Unable to get the application report for "
|
||||
+ request.getApplicationId() + "to SubCluster "
|
||||
+ subClusterId.getId(), e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
if (response == null) {
|
||||
LOG.error("No response when attempting to retrieve the report of "
|
||||
+ "the application " + request.getApplicationId() + " to SubCluster "
|
||||
+ subClusterId.getId());
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationsResponse getApplications(GetApplicationsRequest request)
|
||||
throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetClusterMetricsResponse getClusterMetrics(
|
||||
GetClusterMetricsRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
|
||||
throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
|
||||
throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetQueueUserAclsInfoResponse getQueueUserAcls(
|
||||
GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
|
||||
MoveApplicationAcrossQueuesRequest request)
|
||||
throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetNewReservationResponse getNewReservation(
|
||||
GetNewReservationRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReservationSubmissionResponse submitReservation(
|
||||
ReservationSubmissionRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReservationListResponse listReservations(
|
||||
ReservationListRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReservationUpdateResponse updateReservation(
|
||||
ReservationUpdateRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReservationDeleteResponse deleteReservation(
|
||||
ReservationDeleteRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetNodesToLabelsResponse getNodeToLabels(
|
||||
GetNodesToLabelsRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetLabelsToNodesResponse getLabelsToNodes(
|
||||
GetLabelsToNodesRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetClusterNodeLabelsResponse getClusterNodeLabels(
|
||||
GetClusterNodeLabelsRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
|
||||
GetApplicationAttemptReportRequest request)
|
||||
throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetApplicationAttemptsResponse getApplicationAttempts(
|
||||
GetApplicationAttemptsRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetContainerReportResponse getContainerReport(
|
||||
GetContainerReportRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetContainersResponse getContainers(GetContainersRequest request)
|
||||
throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetDelegationTokenResponse getDelegationToken(
|
||||
GetDelegationTokenRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RenewDelegationTokenResponse renewDelegationToken(
|
||||
RenewDelegationTokenRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CancelDelegationTokenResponse cancelDelegationToken(
|
||||
CancelDelegationTokenRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FailApplicationAttemptResponse failApplicationAttempt(
|
||||
FailApplicationAttemptRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateApplicationPriorityResponse updateApplicationPriority(
|
||||
UpdateApplicationPriorityRequest request)
|
||||
throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SignalContainerResponse signalToContainer(
|
||||
SignalContainerRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
|
||||
UpdateApplicationTimeoutsRequest request)
|
||||
throws YarnException, IOException {
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
}
|
|
@ -119,29 +119,41 @@ public abstract class BaseRouterClientRMTest {
|
|||
return this.clientrmService;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
this.conf = new YarnConfiguration();
|
||||
protected YarnConfiguration createConfiguration() {
|
||||
YarnConfiguration config = new YarnConfiguration();
|
||||
String mockPassThroughInterceptorClass =
|
||||
PassThroughClientRequestInterceptor.class.getName();
|
||||
|
||||
// Create a request intercepter pipeline for testing. The last one in the
|
||||
// chain will call the mock resource manager. The others in the chain will
|
||||
// simply forward it to the next one in the chain
|
||||
this.conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
|
||||
config.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
|
||||
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
|
||||
+ "," + mockPassThroughInterceptorClass + ","
|
||||
+ MockClientRequestInterceptor.class.getName());
|
||||
|
||||
this.conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
|
||||
config.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
|
||||
TEST_MAX_CACHE_SIZE);
|
||||
return config;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
this.conf = createConfiguration();
|
||||
this.dispatcher = new AsyncDispatcher();
|
||||
this.dispatcher.init(conf);
|
||||
this.dispatcher.start();
|
||||
this.clientrmService = createAndStartRouterClientRMService();
|
||||
}
|
||||
|
||||
public void setUpConfig() {
|
||||
this.conf = createConfiguration();
|
||||
}
|
||||
|
||||
protected Configuration getConf() {
|
||||
return this.conf;
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
if (clientrmService != null) {
|
||||
|
|
|
@ -0,0 +1,403 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.router.clientrm;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
|
||||
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Extends the {@code BaseRouterClientRMTest} and overrides methods in order to
|
||||
* use the {@code RouterClientRMService} pipeline test cases for testing the
|
||||
* {@code FederationInterceptor} class. The tests for
|
||||
* {@code RouterClientRMService} has been written cleverly so that it can be
|
||||
* reused to validate different request intercepter chains.
|
||||
*/
|
||||
public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestFederationClientInterceptor.class);
|
||||
|
||||
private TestableFederationClientInterceptor interceptor;
|
||||
private MemoryFederationStateStore stateStore;
|
||||
private FederationStateStoreTestUtil stateStoreUtil;
|
||||
private List<SubClusterId> subClusters;
|
||||
|
||||
private String user = "test-user";
|
||||
|
||||
private final static int NUM_SUBCLUSTER = 4;
|
||||
|
||||
@Override
|
||||
public void setUp() {
|
||||
super.setUpConfig();
|
||||
interceptor = new TestableFederationClientInterceptor();
|
||||
|
||||
stateStore = new MemoryFederationStateStore();
|
||||
stateStore.init(this.getConf());
|
||||
FederationStateStoreFacade.getInstance().reinitialize(stateStore,
|
||||
getConf());
|
||||
stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
|
||||
|
||||
interceptor.setConf(this.getConf());
|
||||
interceptor.init(user);
|
||||
|
||||
subClusters = new ArrayList<SubClusterId>();
|
||||
|
||||
try {
|
||||
for (int i = 0; i < NUM_SUBCLUSTER; i++) {
|
||||
SubClusterId sc = SubClusterId.newInstance(Integer.toString(i));
|
||||
stateStoreUtil.registerSubCluster(sc);
|
||||
subClusters.add(sc);
|
||||
}
|
||||
} catch (YarnException e) {
|
||||
LOG.error(e.getMessage());
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() {
|
||||
interceptor.shutdown();
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected YarnConfiguration createConfiguration() {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
|
||||
String mockPassThroughInterceptorClass =
|
||||
PassThroughClientRequestInterceptor.class.getName();
|
||||
|
||||
// Create a request intercepter pipeline for testing. The last one in the
|
||||
// chain is the federation intercepter that calls the mock resource manager.
|
||||
// The others in the chain will simply forward it to the next one in the
|
||||
// chain
|
||||
conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
|
||||
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
|
||||
+ "," + TestableFederationClientInterceptor.class.getName());
|
||||
|
||||
conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
|
||||
UniformBroadcastPolicyManager.class.getName());
|
||||
|
||||
// Disable StateStoreFacade cache
|
||||
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetNewApplication. The return
|
||||
* ApplicationId has to belong to one of the SubCluster in the cluster.
|
||||
*/
|
||||
@Test
|
||||
public void testGetNewApplication()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
System.out.println("Test FederationClientInterceptor: Get New Application");
|
||||
|
||||
GetNewApplicationRequest request = GetNewApplicationRequest.newInstance();
|
||||
GetNewApplicationResponse response = interceptor.getNewApplication(request);
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertNotNull(response.getApplicationId());
|
||||
Assert.assertTrue(
|
||||
response.getApplicationId().getClusterTimestamp() < NUM_SUBCLUSTER);
|
||||
Assert.assertTrue(response.getApplicationId().getClusterTimestamp() >= 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of SubmitApplication. The application
|
||||
* has to be submitted to one of the SubCluster in the cluster.
|
||||
*/
|
||||
@Test
|
||||
public void testSubmitApplication()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
System.out.println("Test FederationClientInterceptor: Submit Application");
|
||||
|
||||
ApplicationId appId =
|
||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
ApplicationSubmissionContext context = ApplicationSubmissionContext
|
||||
.newInstance(appId, "", "", null, null, false, false, -1, null, null);
|
||||
SubmitApplicationRequest request =
|
||||
SubmitApplicationRequest.newInstance(context);
|
||||
|
||||
SubmitApplicationResponse response = interceptor.submitApplication(request);
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
|
||||
Assert.assertNotNull(scIdResult);
|
||||
Assert.assertTrue(subClusters.contains(scIdResult));
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of SubmitApplication in case of
|
||||
* multiple submission. The first retry has to be submitted to the same
|
||||
* SubCluster of the first attempt.
|
||||
*/
|
||||
@Test
|
||||
public void testSubmitApplicationMultipleSubmission()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
System.out.println(
|
||||
"Test FederationClientInterceptor: Submit Application - Multiple");
|
||||
|
||||
ApplicationId appId =
|
||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
ApplicationSubmissionContext context = ApplicationSubmissionContext
|
||||
.newInstance(appId, "", "", null, null, false, false, -1, null, null);
|
||||
SubmitApplicationRequest request =
|
||||
SubmitApplicationRequest.newInstance(context);
|
||||
|
||||
// First attempt
|
||||
SubmitApplicationResponse response = interceptor.submitApplication(request);
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
|
||||
Assert.assertNotNull(scIdResult);
|
||||
|
||||
// First retry
|
||||
response = interceptor.submitApplication(request);
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
SubClusterId scIdResult2 = stateStoreUtil.queryApplicationHomeSC(appId);
|
||||
Assert.assertNotNull(scIdResult2);
|
||||
Assert.assertEquals(scIdResult, scIdResult);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of SubmitApplication in case of empty
|
||||
* request.
|
||||
*/
|
||||
@Test
|
||||
public void testSubmitApplicationEmptyRequest()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
System.out.println(
|
||||
"Test FederationClientInterceptor: Submit Application - Empty");
|
||||
try {
|
||||
interceptor.submitApplication(null);
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(
|
||||
e.getMessage().startsWith("Missing submitApplication request or "
|
||||
+ "applicationSubmissionContex information."));
|
||||
}
|
||||
try {
|
||||
interceptor.submitApplication(SubmitApplicationRequest.newInstance(null));
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(
|
||||
e.getMessage().startsWith("Missing submitApplication request or "
|
||||
+ "applicationSubmissionContex information."));
|
||||
}
|
||||
try {
|
||||
ApplicationSubmissionContext context = ApplicationSubmissionContext
|
||||
.newInstance(null, "", "", null, null, false, false, -1, null, null);
|
||||
SubmitApplicationRequest request =
|
||||
SubmitApplicationRequest.newInstance(context);
|
||||
interceptor.submitApplication(request);
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(
|
||||
e.getMessage().startsWith("Missing submitApplication request or "
|
||||
+ "applicationSubmissionContex information."));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of ForceKillApplication in case the
|
||||
* application exists in the cluster.
|
||||
*/
|
||||
@Test
|
||||
public void testForceKillApplication()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
System.out
|
||||
.println("Test FederationClientInterceptor: Force Kill Application");
|
||||
|
||||
ApplicationId appId =
|
||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
ApplicationSubmissionContext context = ApplicationSubmissionContext
|
||||
.newInstance(appId, "", "", null, null, false, false, -1, null, null);
|
||||
|
||||
SubmitApplicationRequest request =
|
||||
SubmitApplicationRequest.newInstance(context);
|
||||
// Submit the application we are going to kill later
|
||||
SubmitApplicationResponse response = interceptor.submitApplication(request);
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
|
||||
|
||||
KillApplicationRequest requestKill =
|
||||
KillApplicationRequest.newInstance(appId);
|
||||
KillApplicationResponse responseKill =
|
||||
interceptor.forceKillApplication(requestKill);
|
||||
Assert.assertNotNull(responseKill);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of ForceKillApplication in case of
|
||||
* application does not exist in StateStore.
|
||||
*/
|
||||
@Test
|
||||
public void testForceKillApplicationNotExists()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
System.out.println("Test FederationClientInterceptor: "
|
||||
+ "Force Kill Application - Not Exists");
|
||||
|
||||
ApplicationId appId =
|
||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
KillApplicationRequest requestKill =
|
||||
KillApplicationRequest.newInstance(appId);
|
||||
try {
|
||||
interceptor.forceKillApplication(requestKill);
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(e.getMessage().equals(
|
||||
"Application " + appId + " does not exist in FederationStateStore"));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of ForceKillApplication in case of
|
||||
* empty request.
|
||||
*/
|
||||
@Test
|
||||
public void testForceKillApplicationEmptyRequest()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
System.out.println(
|
||||
"Test FederationClientInterceptor: Force Kill Application - Empty");
|
||||
try {
|
||||
interceptor.forceKillApplication(null);
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(e.getMessage().startsWith(
|
||||
"Missing forceKillApplication request or ApplicationId."));
|
||||
}
|
||||
try {
|
||||
interceptor
|
||||
.forceKillApplication(KillApplicationRequest.newInstance(null));
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(e.getMessage().startsWith(
|
||||
"Missing forceKillApplication request or ApplicationId."));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetApplicationReport in case the
|
||||
* application exists in the cluster.
|
||||
*/
|
||||
@Test
|
||||
public void testGetApplicationReport()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
System.out
|
||||
.println("Test FederationClientInterceptor: Get Application Report");
|
||||
|
||||
ApplicationId appId =
|
||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
ApplicationSubmissionContext context = ApplicationSubmissionContext
|
||||
.newInstance(appId, "", "", null, null, false, false, -1, null, null);
|
||||
|
||||
SubmitApplicationRequest request =
|
||||
SubmitApplicationRequest.newInstance(context);
|
||||
// Submit the application we want the report later
|
||||
SubmitApplicationResponse response = interceptor.submitApplication(request);
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
|
||||
|
||||
GetApplicationReportRequest requestGet =
|
||||
GetApplicationReportRequest.newInstance(appId);
|
||||
|
||||
GetApplicationReportResponse responseGet =
|
||||
interceptor.getApplicationReport(requestGet);
|
||||
|
||||
Assert.assertNotNull(responseGet);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetApplicationReport in case the
|
||||
* application does not exist in StateStore.
|
||||
*/
|
||||
@Test
|
||||
public void testGetApplicationNotExists()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
System.out.println(
|
||||
"Test ApplicationClientProtocol: Get Application Report - Not Exists");
|
||||
ApplicationId appId =
|
||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
GetApplicationReportRequest requestGet =
|
||||
GetApplicationReportRequest.newInstance(appId);
|
||||
try {
|
||||
interceptor.getApplicationReport(requestGet);
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(e.getMessage().equals(
|
||||
"Application " + appId + " does not exist in FederationStateStore"));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetApplicationReport in case of
|
||||
* empty request.
|
||||
*/
|
||||
@Test
|
||||
public void testGetApplicationEmptyRequest()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
System.out.println(
|
||||
"Test FederationClientInterceptor: Get Application Report - Empty");
|
||||
try {
|
||||
interceptor.getApplicationReport(null);
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(
|
||||
e.getMessage().startsWith("Missing getApplicationReport request or "
|
||||
+ "applicationId information."));
|
||||
}
|
||||
try {
|
||||
interceptor
|
||||
.getApplicationReport(GetApplicationReportRequest.newInstance(null));
|
||||
Assert.fail();
|
||||
} catch (YarnException e) {
|
||||
Assert.assertTrue(
|
||||
e.getMessage().startsWith("Missing getApplicationReport request or "
|
||||
+ "applicationId information."));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,295 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.router.clientrm;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
|
||||
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Extends the {@code BaseRouterClientRMTest} and overrides methods in order to
|
||||
* use the {@code RouterClientRMService} pipeline test cases for testing the
|
||||
* {@code FederationInterceptor} class. The tests for
|
||||
* {@code RouterClientRMService} has been written cleverly so that it can be
|
||||
* reused to validate different request intercepter chains.
|
||||
*
|
||||
* It tests the case with SubClusters down and the Router logic of retries. We
|
||||
* have 1 good SubCluster and 2 bad ones for all the tests.
|
||||
*/
|
||||
public class TestFederationClientInterceptorRetry
|
||||
extends BaseRouterClientRMTest {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestFederationClientInterceptorRetry.class);
|
||||
|
||||
private TestableFederationClientInterceptor interceptor;
|
||||
private MemoryFederationStateStore stateStore;
|
||||
private FederationStateStoreTestUtil stateStoreUtil;
|
||||
|
||||
private String user = "test-user";
|
||||
|
||||
// running and registered
|
||||
private static SubClusterId good;
|
||||
|
||||
// registered but not running
|
||||
private static SubClusterId bad1;
|
||||
private static SubClusterId bad2;
|
||||
|
||||
private static List<SubClusterId> scs = new ArrayList<SubClusterId>();
|
||||
|
||||
@Override
|
||||
public void setUp() {
|
||||
super.setUpConfig();
|
||||
interceptor = new TestableFederationClientInterceptor();
|
||||
|
||||
stateStore = new MemoryFederationStateStore();
|
||||
stateStore.init(this.getConf());
|
||||
FederationStateStoreFacade.getInstance().reinitialize(stateStore,
|
||||
getConf());
|
||||
stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
|
||||
|
||||
interceptor.setConf(this.getConf());
|
||||
interceptor.init(user);
|
||||
|
||||
// Create SubClusters
|
||||
good = SubClusterId.newInstance("0");
|
||||
bad1 = SubClusterId.newInstance("1");
|
||||
bad2 = SubClusterId.newInstance("2");
|
||||
scs.add(good);
|
||||
scs.add(bad1);
|
||||
scs.add(bad2);
|
||||
|
||||
// The mock RM will not start in these SubClusters, this is done to simulate
|
||||
// a SubCluster down
|
||||
|
||||
interceptor.registerBadSubCluster(bad1);
|
||||
interceptor.registerBadSubCluster(bad2);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() {
|
||||
interceptor.shutdown();
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
private void setupCluster(List<SubClusterId> scsToRegister)
|
||||
throws YarnException {
|
||||
|
||||
try {
|
||||
// Clean up the StateStore before every test
|
||||
stateStoreUtil.deregisterAllSubClusters();
|
||||
|
||||
for (SubClusterId sc : scsToRegister) {
|
||||
stateStoreUtil.registerSubCluster(sc);
|
||||
}
|
||||
} catch (YarnException e) {
|
||||
LOG.error(e.getMessage());
|
||||
Assert.fail();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected YarnConfiguration createConfiguration() {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
|
||||
String mockPassThroughInterceptorClass =
|
||||
PassThroughClientRequestInterceptor.class.getName();
|
||||
|
||||
// Create a request intercepter pipeline for testing. The last one in the
|
||||
// chain is the federation intercepter that calls the mock resource manager.
|
||||
// The others in the chain will simply forward it to the next one in the
|
||||
// chain
|
||||
conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
|
||||
mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
|
||||
+ "," + TestableFederationClientInterceptor.class.getName());
|
||||
|
||||
conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
|
||||
UniformBroadcastPolicyManager.class.getName());
|
||||
|
||||
// Disable StateStoreFacade cache
|
||||
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetNewApplication in case the
|
||||
* cluster is composed of only 1 bad SubCluster.
|
||||
*/
|
||||
@Test
|
||||
public void testGetNewApplicationOneBadSC()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
|
||||
System.out.println("Test getNewApplication with one bad SubCluster");
|
||||
setupCluster(Arrays.asList(bad2));
|
||||
|
||||
try {
|
||||
interceptor.getNewApplication(GetNewApplicationRequest.newInstance());
|
||||
Assert.fail();
|
||||
} catch (Exception e) {
|
||||
System.out.println(e.toString());
|
||||
Assert.assertTrue(e.getMessage()
|
||||
.equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetNewApplication in case the
|
||||
* cluster is composed of only 2 bad SubClusters.
|
||||
*/
|
||||
@Test
|
||||
public void testGetNewApplicationTwoBadSCs()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
System.out.println("Test getNewApplication with two bad SubClusters");
|
||||
setupCluster(Arrays.asList(bad1, bad2));
|
||||
|
||||
try {
|
||||
interceptor.getNewApplication(GetNewApplicationRequest.newInstance());
|
||||
Assert.fail();
|
||||
} catch (Exception e) {
|
||||
System.out.println(e.toString());
|
||||
Assert.assertTrue(e.getMessage()
|
||||
.equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of GetNewApplication in case the
|
||||
* cluster is composed of only 1 bad SubCluster and 1 good one.
|
||||
*/
|
||||
@Test
|
||||
public void testGetNewApplicationOneBadOneGood()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
System.out.println("Test getNewApplication with one bad, one good SC");
|
||||
setupCluster(Arrays.asList(good, bad2));
|
||||
GetNewApplicationResponse response = null;
|
||||
try {
|
||||
response =
|
||||
interceptor.getNewApplication(GetNewApplicationRequest.newInstance());
|
||||
} catch (Exception e) {
|
||||
Assert.fail();
|
||||
}
|
||||
Assert.assertEquals(Integer.parseInt(good.getId()),
|
||||
response.getApplicationId().getClusterTimestamp());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of SubmitApplication in case the
|
||||
* cluster is composed of only 1 bad SubCluster.
|
||||
*/
|
||||
@Test
|
||||
public void testSubmitApplicationOneBadSC()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
|
||||
System.out.println("Test submitApplication with one bad SubCluster");
|
||||
setupCluster(Arrays.asList(bad2));
|
||||
|
||||
final ApplicationId appId =
|
||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
|
||||
ApplicationSubmissionContext context = ApplicationSubmissionContext
|
||||
.newInstance(appId, "", "", null, null, false, false, -1, null, null);
|
||||
final SubmitApplicationRequest request =
|
||||
SubmitApplicationRequest.newInstance(context);
|
||||
try {
|
||||
interceptor.submitApplication(request);
|
||||
Assert.fail();
|
||||
} catch (Exception e) {
|
||||
System.out.println(e.toString());
|
||||
Assert.assertTrue(e.getMessage()
|
||||
.equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of SubmitApplication in case the
|
||||
* cluster is composed of only 2 bad SubClusters.
|
||||
*/
|
||||
@Test
|
||||
public void testSubmitApplicationTwoBadSCs()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
System.out.println("Test submitApplication with two bad SubClusters");
|
||||
setupCluster(Arrays.asList(bad1, bad2));
|
||||
|
||||
final ApplicationId appId =
|
||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
|
||||
ApplicationSubmissionContext context = ApplicationSubmissionContext
|
||||
.newInstance(appId, "", "", null, null, false, false, -1, null, null);
|
||||
final SubmitApplicationRequest request =
|
||||
SubmitApplicationRequest.newInstance(context);
|
||||
try {
|
||||
interceptor.submitApplication(request);
|
||||
Assert.fail();
|
||||
} catch (Exception e) {
|
||||
System.out.println(e.toString());
|
||||
Assert.assertTrue(e.getMessage()
|
||||
.equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test validates the correctness of SubmitApplication in case the
|
||||
* cluster is composed of only 1 bad SubCluster and a good one.
|
||||
*/
|
||||
@Test
|
||||
public void testSubmitApplicationOneBadOneGood()
|
||||
throws YarnException, IOException, InterruptedException {
|
||||
System.out.println("Test submitApplication with one bad, one good SC");
|
||||
setupCluster(Arrays.asList(good, bad2));
|
||||
|
||||
final ApplicationId appId =
|
||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
|
||||
ApplicationSubmissionContext context = ApplicationSubmissionContext
|
||||
.newInstance(appId, "", "", null, null, false, false, -1, null, null);
|
||||
final SubmitApplicationRequest request =
|
||||
SubmitApplicationRequest.newInstance(context);
|
||||
try {
|
||||
interceptor.submitApplication(request);
|
||||
} catch (Exception e) {
|
||||
Assert.fail();
|
||||
}
|
||||
Assert.assertEquals(good,
|
||||
stateStore
|
||||
.getApplicationHomeSubCluster(
|
||||
GetApplicationHomeSubClusterRequest.newInstance(appId))
|
||||
.getApplicationHomeSubCluster().getHomeSubCluster());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.router.clientrm;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
|
||||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
|
||||
/**
|
||||
* Extends the FederationClientInterceptor and overrides methods to provide a
|
||||
* testable implementation of FederationClientInterceptor.
|
||||
*/
|
||||
public class TestableFederationClientInterceptor
|
||||
extends FederationClientInterceptor {
|
||||
|
||||
private ConcurrentHashMap<SubClusterId, MockResourceManagerFacade> mockRMs =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
private List<SubClusterId> badSubCluster = new ArrayList<SubClusterId>();
|
||||
|
||||
@Override
|
||||
protected ApplicationClientProtocol getClientRMProxyForSubCluster(
|
||||
SubClusterId subClusterId) throws YarnException {
|
||||
|
||||
MockResourceManagerFacade mockRM = null;
|
||||
synchronized (this) {
|
||||
if (mockRMs.containsKey(subClusterId)) {
|
||||
mockRM = mockRMs.get(subClusterId);
|
||||
} else {
|
||||
mockRM = new MockResourceManagerFacade(super.getConf(), 0,
|
||||
Integer.parseInt(subClusterId.getId()),
|
||||
!badSubCluster.contains(subClusterId));
|
||||
mockRMs.put(subClusterId, mockRM);
|
||||
|
||||
}
|
||||
return mockRM;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For testing purpose, some subclusters has to be down to simulate particular
|
||||
* scenarios as RM Failover, network issues. For this reason we keep track of
|
||||
* these bad subclusters. This method make the subcluster unusable.
|
||||
*
|
||||
* @param badSC the subcluster to make unusable
|
||||
*/
|
||||
protected void registerBadSubCluster(SubClusterId badSC) {
|
||||
badSubCluster.add(badSC);
|
||||
if (mockRMs.contains(badSC)) {
|
||||
mockRMs.get(badSC).setRunningMode(false);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue