YARN-11177. Support getNewReservation, submit / update/ Reservation API's for Federation. (#4764)

This commit is contained in:
slfan1989 2022-09-02 07:35:20 +08:00 committed by GitHub
parent 20560401ec
commit 37e213c3fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 829 additions and 9 deletions

View File

@ -17,11 +17,18 @@
package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
@ -29,11 +36,15 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Mockito.when;
/**
* Simple test class for the {@link LoadBasedRouterPolicy}. Test that the load
* is properly considered for allocation.
@ -134,4 +145,57 @@ public class TestLoadBasedRouterPolicy extends BaseRouterPoliciesTest {
() -> ((FederationRouterPolicy) policy).
getHomeSubcluster(getApplicationSubmissionContext(), null));
}
@Test
public void testUpdateReservation() throws YarnException {
long now = Time.now();
ReservationSubmissionRequest resReq = getReservationSubmissionRequest();
when(resReq.getQueue()).thenReturn("queue1");
when(resReq.getReservationId()).thenReturn(ReservationId.newInstance(now, 1));
// first we invoke a reservation placement
FederationRouterPolicy routerPolicy = (FederationRouterPolicy) getPolicy();
SubClusterId chosen = routerPolicy.getReservationHomeSubcluster(resReq);
// add this to the store
FederationStateStoreFacade facade =
getFederationPolicyContext().getFederationStateStoreFacade();
ReservationHomeSubCluster subCluster =
ReservationHomeSubCluster.newInstance(resReq.getReservationId(), chosen);
facade.addReservationHomeSubCluster(subCluster);
// get all activeSubClusters
Map<SubClusterId, SubClusterInfo> activeSubClusters = getActiveSubclusters();
// Update ReservationHomeSubCluster
// Cannot be randomly selected, SubCluster with Weight >= 1.0 needs to be selected
WeightedPolicyInfo weightedPolicyInfo = this.getPolicyInfo();
Map<SubClusterIdInfo, Float> routerPolicyWeights = weightedPolicyInfo.getRouterPolicyWeights();
List<SubClusterId> subClusterIds = new ArrayList<>();
for (Map.Entry<SubClusterIdInfo, Float> entry : routerPolicyWeights.entrySet()) {
SubClusterIdInfo subClusterIdInfo = entry.getKey();
Float subClusterWeight = entry.getValue();
if (subClusterWeight >= 1.0) {
subClusterIds.add(subClusterIdInfo.toId());
}
}
SubClusterId chosen2 = subClusterIds.get(this.getRand().nextInt(subClusterIds.size()));
ReservationHomeSubCluster subCluster2 =
ReservationHomeSubCluster.newInstance(resReq.getReservationId(), chosen2);
facade.updateReservationHomeSubCluster(subCluster2);
// route an application that uses this app
ApplicationSubmissionContext applicationSubmissionContext =
ApplicationSubmissionContext.newInstance(
ApplicationId.newInstance(now, 1), "app1", "queue1", Priority.newInstance(1),
null, false, false, 1, null, null, false);
applicationSubmissionContext.setReservationID(resReq.getReservationId());
SubClusterId chosen3 = routerPolicy.getHomeSubcluster(
applicationSubmissionContext, new ArrayList<>());
Assert.assertEquals(chosen2, chosen3);
}
}

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
@ -38,6 +39,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.util.MonotonicClock;
@ -178,4 +181,12 @@ public class FederationStateStoreTestUtil {
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED));
}
public SubClusterId queryReservationHomeSC(ReservationId reservationId)
throws YarnException {
GetReservationHomeSubClusterRequest request =
GetReservationHomeSubClusterRequest.newInstance(reservationId);
GetReservationHomeSubClusterResponse response =
stateStore.getReservationHomeSubCluster(request);
return response.getReservationHomeSubCluster().getHomeSubCluster();
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.token.Token;
@ -64,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -1035,4 +1037,9 @@ public class MockRM extends ResourceManager {
public RMStateStore getRMStateStore() {
return getRMContext().getStateStore();
}
@VisibleForTesting
public ReservationSystem getReservationSystem(){
return this.reservationSystem;
}
}

View File

@ -97,6 +97,14 @@ public final class RouterMetrics {
private MutableGaugeInt numGetClusterNodeAttributesFailedRetrieved;
@Metric("# of getNodesToAttributes failed to be retrieved")
private MutableGaugeInt numGetNodesToAttributesFailedRetrieved;
@Metric("# of getNewReservation failed to be retrieved")
private MutableGaugeInt numGetNewReservationFailedRetrieved;
@Metric("# of submitReservation failed to be retrieved")
private MutableGaugeInt numSubmitReservationFailedRetrieved;
@Metric("# of submitReservation failed to be retrieved")
private MutableGaugeInt numUpdateReservationFailedRetrieved;
@Metric("# of deleteReservation failed to be retrieved")
private MutableGaugeInt numDeleteReservationFailedRetrieved;
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
@ -155,6 +163,14 @@ public final class RouterMetrics {
private MutableRate totalSucceededGetClusterNodeAttributesRetrieved;
@Metric("Total number of successful Retrieved getNodesToAttributes and latency(ms)")
private MutableRate totalSucceededGetNodesToAttributesRetrieved;
@Metric("Total number of successful Retrieved GetNewReservation and latency(ms)")
private MutableRate totalSucceededGetNewReservationRetrieved;
@Metric("Total number of successful Retrieved SubmitReservation and latency(ms)")
private MutableRate totalSucceededSubmitReservationRetrieved;
@Metric("Total number of successful Retrieved UpdateReservation and latency(ms)")
private MutableRate totalSucceededUpdateReservationRetrieved;
@Metric("Total number of successful Retrieved DeleteReservation and latency(ms)")
private MutableRate totalSucceededDeleteReservationRetrieved;
/**
* Provide quantile counters for all latencies.
@ -186,8 +202,11 @@ public final class RouterMetrics {
private MutableQuantiles getResourceProfileLatency;
private MutableQuantiles getAttributesToNodesLatency;
private MutableQuantiles getClusterNodeAttributesLatency;
private MutableQuantiles getNodesToAttributesLatency;
private MutableQuantiles getNewReservationLatency;
private MutableQuantiles submitReservationLatency;
private MutableQuantiles updateReservationLatency;
private MutableQuantiles deleteReservationLatency;
private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
@ -298,6 +317,22 @@ public final class RouterMetrics {
getNodesToAttributesLatency =
registry.newQuantiles("getNodesToAttributesLatency",
"latency of get nodes to attributes timeouts", "ops", "latency", 10);
getNewReservationLatency =
registry.newQuantiles("getNewReservationLatency",
"latency of get new reservation timeouts", "ops", "latency", 10);
submitReservationLatency =
registry.newQuantiles("submitReservationLatency",
"latency of submit reservation timeouts", "ops", "latency", 10);
updateReservationLatency =
registry.newQuantiles("updateReservationLatency",
"latency of update reservation timeouts", "ops", "latency", 10);
deleteReservationLatency =
registry.newQuantiles("deleteReservationLatency",
"latency of delete reservation timeouts", "ops", "latency", 10);
}
public static RouterMetrics getMetrics() {
@ -459,6 +494,26 @@ public final class RouterMetrics {
return totalSucceededGetNodesToAttributesRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededGetNewReservationRetrieved() {
return totalSucceededGetNewReservationRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededSubmitReservationRetrieved() {
return totalSucceededSubmitReservationRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededUpdateReservationRetrieved() {
return totalSucceededUpdateReservationRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededDeleteReservationRetrieved() {
return totalSucceededDeleteReservationRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
@ -599,6 +654,26 @@ public final class RouterMetrics {
return totalSucceededGetNodesToAttributesRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetNewReservationRetrieved() {
return totalSucceededGetNewReservationRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededSubmitReservationRetrieved() {
return totalSucceededSubmitReservationRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededUpdateReservationRetrieved() {
return totalSucceededUpdateReservationRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededDeleteReservationRetrieved() {
return totalSucceededDeleteReservationRetrieved.lastStat().mean();
}
@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
@ -732,6 +807,22 @@ public final class RouterMetrics {
return numGetNodesToAttributesFailedRetrieved.value();
}
public int getNewReservationFailedRetrieved() {
return numGetNewReservationFailedRetrieved.value();
}
public int getSubmitReservationFailedRetrieved() {
return numSubmitReservationFailedRetrieved.value();
}
public int getUpdateReservationFailedRetrieved() {
return numUpdateReservationFailedRetrieved.value();
}
public int getDeleteReservationFailedRetrieved() {
return numDeleteReservationFailedRetrieved.value();
}
public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
@ -872,6 +963,26 @@ public final class RouterMetrics {
getNodesToAttributesLatency.add(duration);
}
public void succeededGetNewReservationRetrieved(long duration) {
totalSucceededGetNewReservationRetrieved.add(duration);
getNewReservationLatency.add(duration);
}
public void succeededSubmitReservationRetrieved(long duration) {
totalSucceededSubmitReservationRetrieved.add(duration);
submitReservationLatency.add(duration);
}
public void succeededUpdateReservationRetrieved(long duration) {
totalSucceededUpdateReservationRetrieved.add(duration);
updateReservationLatency.add(duration);
}
public void succeededDeleteReservationRetrieved(long duration) {
totalSucceededDeleteReservationRetrieved.add(duration);
deleteReservationLatency.add(duration);
}
public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
@ -983,4 +1094,20 @@ public final class RouterMetrics {
public void incrGetNodesToAttributesFailedRetrieved() {
numGetNodesToAttributesFailedRetrieved.incr();
}
public void incrGetNewReservationFailedRetrieved() {
numGetNewReservationFailedRetrieved.incr();
}
public void incrSubmitReservationFailedRetrieved() {
numSubmitReservationFailedRetrieved.incr();
}
public void incrUpdateReservationFailedRetrieved() {
numUpdateReservationFailedRetrieved.incr();
}
public void incrDeleteReservationFailedRetrieved() {
numDeleteReservationFailedRetrieved.incr();
}
}

View File

@ -116,6 +116,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRespo
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -124,6 +125,7 @@ import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
@ -888,13 +890,88 @@ public class FederationClientInterceptor
@Override
public GetNewReservationResponse getNewReservation(
GetNewReservationRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null) {
routerMetrics.incrGetNewReservationFailedRetrieved();
String errMsg = "Missing getNewReservation request.";
RouterServerUtil.logAndThrowException(errMsg, null);
}
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subClustersActive = federationFacade.getSubClusters(true);
for (int i = 0; i < numSubmitRetries; ++i) {
SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
LOG.info("getNewReservation try #{} on SubCluster {}.", i, subClusterId);
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
try {
GetNewReservationResponse response = clientRMProxy.getNewReservation(request);
if (response != null) {
long stopTime = clock.getTime();
routerMetrics.succeededGetNewReservationRetrieved(stopTime - startTime);
return response;
}
} catch (Exception e) {
LOG.warn("Unable to create a new Reservation in SubCluster {}.", subClusterId.getId(), e);
subClustersActive.remove(subClusterId);
}
}
routerMetrics.incrGetNewReservationFailedRetrieved();
String errMsg = "Failed to create a new reservation.";
throw new YarnException(errMsg);
}
@Override
public ReservationSubmissionResponse submitReservation(
ReservationSubmissionRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null || request.getReservationId() == null
|| request.getReservationDefinition() == null || request.getQueue() == null) {
routerMetrics.incrSubmitReservationFailedRetrieved();
RouterServerUtil.logAndThrowException(
"Missing submitReservation request or reservationId " +
"or reservation definition or queue.", null);
}
long startTime = clock.getTime();
ReservationId reservationId = request.getReservationId();
for (int i = 0; i < numSubmitRetries; i++) {
try {
// First, Get SubClusterId according to specific strategy.
SubClusterId subClusterId = policyFacade.getReservationHomeSubCluster(request);
LOG.info("submitReservation ReservationId {} try #{} on SubCluster {}.",
reservationId, i, subClusterId);
ReservationHomeSubCluster reservationHomeSubCluster =
ReservationHomeSubCluster.newInstance(reservationId, subClusterId);
// Second, determine whether the current ReservationId has a corresponding subCluster.
// If it does not exist, add it. If it exists, update it.
Boolean exists = existsReservationHomeSubCluster(reservationId);
if (!exists) {
addReservationHomeSubCluster(reservationId, reservationHomeSubCluster);
} else {
updateReservationHomeSubCluster(subClusterId, reservationId, reservationHomeSubCluster);
}
// Third, Submit a Reservation request to the subCluster
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
ReservationSubmissionResponse response = clientRMProxy.submitReservation(request);
if (response != null) {
LOG.info("Reservation {} submitted on subCluster {}.", reservationId, subClusterId);
long stopTime = clock.getTime();
routerMetrics.succeededSubmitReservationRetrieved(stopTime - startTime);
return response;
}
} catch (Exception e) {
LOG.warn("Unable to submit(try #{}) the Reservation {}.", i, reservationId, e);
}
}
routerMetrics.incrSubmitReservationFailedRetrieved();
String msg = String.format("Reservation %s failed to be submitted.", reservationId);
throw new YarnException(msg);
}
@Override
@ -925,13 +1002,68 @@ public class FederationClientInterceptor
@Override
public ReservationUpdateResponse updateReservation(
ReservationUpdateRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null || request.getReservationId() == null
|| request.getReservationDefinition() == null) {
routerMetrics.incrUpdateReservationFailedRetrieved();
RouterServerUtil.logAndThrowException(
"Missing updateReservation request or reservationId or reservation definition.", null);
}
long startTime = clock.getTime();
ReservationId reservationId = request.getReservationId();
SubClusterId subClusterId = getReservationHomeSubCluster(reservationId);
try {
ApplicationClientProtocol client = getClientRMProxyForSubCluster(subClusterId);
ReservationUpdateResponse response = client.updateReservation(request);
if (response != null) {
long stopTime = clock.getTime();
routerMetrics.succeededUpdateReservationRetrieved(stopTime - startTime);
return response;
}
} catch (Exception ex) {
routerMetrics.incrUpdateReservationFailedRetrieved();
RouterServerUtil.logAndThrowException(
"Unable to reservation update due to exception.", ex);
}
routerMetrics.incrUpdateReservationFailedRetrieved();
String msg = String.format("Reservation %s failed to be update.", reservationId);
throw new YarnException(msg);
}
@Override
public ReservationDeleteResponse deleteReservation(
ReservationDeleteRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null || request.getReservationId() == null) {
routerMetrics.incrDeleteReservationFailedRetrieved();
RouterServerUtil.logAndThrowException(
"Missing deleteReservation request or reservationId.", null);
}
long startTime = clock.getTime();
ReservationId reservationId = request.getReservationId();
SubClusterId subClusterId = getReservationHomeSubCluster(reservationId);
try {
ApplicationClientProtocol client = getClientRMProxyForSubCluster(subClusterId);
ReservationDeleteResponse response = client.deleteReservation(request);
if (response != null) {
federationFacade.deleteReservationHomeSubCluster(reservationId);
long stopTime = clock.getTime();
routerMetrics.succeededDeleteReservationRetrieved(stopTime - startTime);
return response;
}
} catch (Exception ex) {
routerMetrics.incrUpdateReservationFailedRetrieved();
RouterServerUtil.logAndThrowException(
"Unable to reservation delete due to exception.", ex);
}
routerMetrics.incrDeleteReservationFailedRetrieved();
String msg = String.format("Reservation %s failed to be delete.", reservationId);
throw new YarnException(msg);
}
private <R> Collection<R> invokeAppClientProtocolMethod(
@ -1582,7 +1714,7 @@ public class FederationClientInterceptor
getApplicationHomeSubCluster(applicationId);
} catch (YarnException ex) {
if(LOG.isDebugEnabled()){
LOG.debug("can't find applicationId = {} in home sub cluster, " +
LOG.debug("Can't find applicationId = {} in home sub cluster, " +
" try foreach sub clusters.", applicationId);
}
}
@ -1614,13 +1746,38 @@ public class FederationClientInterceptor
} catch (Exception ex) {
if(LOG.isDebugEnabled()){
LOG.debug("Can't Find ApplicationId = {} in Sub Cluster!", applicationId);
LOG.debug("Can't find applicationId = {} in Sub Cluster!", applicationId);
}
}
}
String errorMsg =
String.format("Can't Found applicationId = %s in any sub clusters", applicationId);
String.format("Can't find applicationId = %s in any sub clusters", applicationId);
throw new YarnException(errorMsg);
}
protected SubClusterId getReservationHomeSubCluster(ReservationId reservationId)
throws YarnException {
if (reservationId == null) {
LOG.error("ReservationId is Null, Can't find in SubCluster.");
return null;
}
// try looking for reservation in Home SubCluster
try {
SubClusterId resultSubClusterId =
federationFacade.getReservationHomeSubCluster(reservationId);
if (resultSubClusterId != null) {
return resultSubClusterId;
}
} catch (YarnException e) {
RouterServerUtil.logAndThrowException(e,
"Can't find reservationId = %s in home sub cluster.", reservationId);
}
String errorMsg =
String.format("Can't find reservationId = %s in home sub cluster.", reservationId);
throw new YarnException(errorMsg);
}
@ -1633,4 +1790,49 @@ public class FederationClientInterceptor
public Map<SubClusterId, ApplicationClientProtocol> getClientRMProxies() {
return clientRMProxies;
}
private Boolean existsReservationHomeSubCluster(ReservationId reservationId) {
try {
SubClusterId subClusterId = federationFacade.getReservationHomeSubCluster(reservationId);
if (subClusterId != null) {
return true;
}
} catch (YarnException e) {
LOG.warn("get homeSubCluster by reservationId = {} error.", reservationId, e);
}
return false;
}
private void addReservationHomeSubCluster(ReservationId reservationId,
ReservationHomeSubCluster homeSubCluster) throws YarnException {
try {
// persist the mapping of reservationId and the subClusterId which has
// been selected as its home
federationFacade.addReservationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
RouterServerUtil.logAndThrowException(e,
"Unable to insert the ReservationId %s into the FederationStateStore.",
reservationId);
}
}
private void updateReservationHomeSubCluster(SubClusterId subClusterId,
ReservationId reservationId, ReservationHomeSubCluster homeSubCluster) throws YarnException {
try {
// update the mapping of reservationId and the home subClusterId to
// the new subClusterId we have selected
federationFacade.updateReservationHomeSubCluster(homeSubCluster);
} catch (YarnException e) {
SubClusterId subClusterIdInStateStore =
federationFacade.getReservationHomeSubCluster(reservationId);
if (subClusterId == subClusterIdInStateStore) {
LOG.info("Reservation {} already submitted on SubCluster {}.",
reservationId, subClusterId);
} else {
RouterServerUtil.logAndThrowException(e,
"Unable to update the ReservationId %s into the FederationStateStore.",
reservationId);
}
}
}
}

View File

@ -453,6 +453,26 @@ public class TestRouterMetrics {
LOG.info("Mocked: failed getNodesToAttributesFailed call");
metrics.incrGetNodesToAttributesFailedRetrieved();
}
public void getNewReservationFailed() {
LOG.info("Mocked: failed getNewReservationFailed call");
metrics.incrGetNewReservationFailedRetrieved();
}
public void getSubmitReservationFailed() {
LOG.info("Mocked: failed getSubmitReservationFailed call");
metrics.incrSubmitReservationFailedRetrieved();
}
public void getUpdateReservationFailed() {
LOG.info("Mocked: failed getUpdateReservationFailed call");
metrics.incrUpdateReservationFailedRetrieved();
}
public void getDeleteReservationFailed() {
LOG.info("Mocked: failed getDeleteReservationFailed call");
metrics.incrDeleteReservationFailedRetrieved();
}
}
// Records successes for all calls
@ -603,6 +623,26 @@ public class TestRouterMetrics {
LOG.info("Mocked: successful getNodesToAttributes call with duration {}", duration);
metrics.succeededGetNodesToAttributesRetrieved(duration);
}
public void getNewReservationRetrieved(long duration) {
LOG.info("Mocked: successful getNewReservation call with duration {}", duration);
metrics.succeededGetNewReservationRetrieved(duration);
}
public void getSubmitReservationRetrieved(long duration) {
LOG.info("Mocked: successful getSubmitReservation call with duration {}", duration);
metrics.succeededSubmitReservationRetrieved(duration);
}
public void getUpdateReservationRetrieved(long duration) {
LOG.info("Mocked: successful getUpdateReservation call with duration {}", duration);
metrics.succeededUpdateReservationRetrieved(duration);
}
public void getDeleteReservationRetrieved(long duration) {
LOG.info("Mocked: successful getDeleteReservation call with duration {}", duration);
metrics.succeededDeleteReservationRetrieved(duration);
}
}
@Test
@ -1069,4 +1109,96 @@ public class TestRouterMetrics {
Assert.assertEquals(totalBadBefore + 1,
metrics.getNodesToAttributesFailedRetrieved());
}
@Test
public void testGetNewReservationRetrieved() {
long totalGoodBefore = metrics.getNumSucceededGetNewReservationRetrieved();
goodSubCluster.getNewReservationRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetNewReservationRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetNewReservationRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getNewReservationRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetNewReservationRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededGetNewReservationRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetNewReservationRetrievedFailed() {
long totalBadBefore = metrics.getNewReservationFailedRetrieved();
badSubCluster.getNewReservationFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getNewReservationFailedRetrieved());
}
@Test
public void testGetSubmitReservationRetrieved() {
long totalGoodBefore = metrics.getNumSucceededSubmitReservationRetrieved();
goodSubCluster.getSubmitReservationRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededSubmitReservationRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededSubmitReservationRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getSubmitReservationRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededSubmitReservationRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededSubmitReservationRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetSubmitReservationRetrievedFailed() {
long totalBadBefore = metrics.getSubmitReservationFailedRetrieved();
badSubCluster.getSubmitReservationFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getSubmitReservationFailedRetrieved());
}
@Test
public void testGetUpdateReservationRetrieved() {
long totalGoodBefore = metrics.getNumSucceededUpdateReservationRetrieved();
goodSubCluster.getUpdateReservationRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededUpdateReservationRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededUpdateReservationRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getUpdateReservationRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededUpdateReservationRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededUpdateReservationRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetUpdateReservationRetrievedFailed() {
long totalBadBefore = metrics.getUpdateReservationFailedRetrieved();
badSubCluster.getUpdateReservationFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getUpdateReservationFailedRetrieved());
}
@Test
public void testGetDeleteReservationRetrieved() {
long totalGoodBefore = metrics.getNumSucceededDeleteReservationRetrieved();
goodSubCluster.getDeleteReservationRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededDeleteReservationRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededDeleteReservationRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getDeleteReservationRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededDeleteReservationRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededDeleteReservationRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetDeleteReservationRetrievedFailed() {
long totalBadBefore = metrics.getDeleteReservationFailedRetrieved();
badSubCluster.getDeleteReservationFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getDeleteReservationFailedRetrieved());
}
}

View File

@ -29,8 +29,11 @@ import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.Arrays;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
@ -88,6 +91,14 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesReques
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -107,6 +118,10 @@ import org.apache.hadoop.yarn.api.records.NodeToAttributeValue;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeInfo;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
@ -117,6 +132,7 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUt
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@ -149,6 +165,8 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
private final static int APP_PRIORITY_ZERO = 0;
private final static long DEFAULT_DURATION = 10 * 60 * 1000;
@Override
public void setUp() {
super.setUpConfig();
@ -175,6 +193,7 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
Assert.fail();
}
DefaultMetricsSystem.setMiniClusterMode(true);
}
@Override
@ -203,6 +222,12 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
// Disable StateStoreFacade cache
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
conf.setInt("yarn.scheduler.minimum-allocation-mb", 512);
conf.setInt("yarn.scheduler.minimum-allocation-vcores", 1);
conf.setInt("yarn.scheduler.maximum-allocation-mb", 100 * 1024);
conf.setInt("yarn.scheduler.maximum-allocation-vcores", 100);
return conf;
}
@ -1254,4 +1279,247 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
NodeAttributeType.STRING, "nvida");
Assert.assertTrue(nodeAttributeMap.get("0-host1").contains(gpu));
}
@Test
public void testGetNewReservation() throws Exception {
LOG.info("Test FederationClientInterceptor : Get NewReservation request.");
// null request
LambdaTestUtils.intercept(YarnException.class,
"Missing getNewReservation request.", () -> interceptor.getNewReservation(null));
// normal request
GetNewReservationRequest request = GetNewReservationRequest.newInstance();
GetNewReservationResponse response = interceptor.getNewReservation(request);
Assert.assertNotNull(response);
ReservationId reservationId = response.getReservationId();
Assert.assertNotNull(reservationId);
Assert.assertTrue(reservationId.toString().contains("reservation"));
Assert.assertEquals(reservationId.getClusterTimestamp(), ResourceManager.getClusterTimeStamp());
}
@Test
public void testSubmitReservation() throws Exception {
LOG.info("Test FederationClientInterceptor : SubmitReservation request.");
// get new reservationId
GetNewReservationRequest request = GetNewReservationRequest.newInstance();
GetNewReservationResponse response = interceptor.getNewReservation(request);
Assert.assertNotNull(response);
// allow plan follower to synchronize, manually trigger an assignment
Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
for (MockRM mockRM : mockRMs.values()) {
ReservationSystem reservationSystem = mockRM.getReservationSystem();
reservationSystem.synchronizePlan("root.decided", true);
}
// Submit Reservation
ReservationId reservationId = response.getReservationId();
ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance(
rDefinition, "decided", reservationId);
ReservationSubmissionResponse submissionResponse =
interceptor.submitReservation(rSubmissionRequest);
Assert.assertNotNull(submissionResponse);
SubClusterId subClusterId = stateStoreUtil.queryReservationHomeSC(reservationId);
Assert.assertNotNull(subClusterId);
Assert.assertTrue(subClusters.contains(subClusterId));
}
@Test
public void testSubmitReservationEmptyRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : SubmitReservation request empty.");
String errorMsg =
"Missing submitReservation request or reservationId or reservation definition or queue.";
// null request1
LambdaTestUtils.intercept(YarnException.class, errorMsg,
() -> interceptor.submitReservation(null));
// null request2
ReservationSubmissionRequest request2 =
ReservationSubmissionRequest.newInstance(null, null, null);
LambdaTestUtils.intercept(YarnException.class, errorMsg,
() -> interceptor.submitReservation(request2));
// null request3
ReservationSubmissionRequest request3 =
ReservationSubmissionRequest.newInstance(null, "q1", null);
LambdaTestUtils.intercept(YarnException.class, errorMsg,
() -> interceptor.submitReservation(request3));
// null request4
ReservationId reservationId = ReservationId.newInstance(Time.now(), 1);
ReservationSubmissionRequest request4 =
ReservationSubmissionRequest.newInstance(null, null, reservationId);
LambdaTestUtils.intercept(YarnException.class, errorMsg,
() -> interceptor.submitReservation(request4));
// null request5
long arrival = Time.now();
long deadline = arrival + (int)(DEFAULT_DURATION * 1.1);
ReservationRequest rRequest = ReservationRequest.newInstance(
Resource.newInstance(1024, 1), 1, 1, DEFAULT_DURATION);
ReservationRequest[] rRequests = new ReservationRequest[] {rRequest};
ReservationDefinition rDefinition = createReservationDefinition(arrival, deadline, rRequests,
ReservationRequestInterpreter.R_ALL, "u1");
ReservationSubmissionRequest request5 =
ReservationSubmissionRequest.newInstance(rDefinition, null, reservationId);
LambdaTestUtils.intercept(YarnException.class, errorMsg,
() -> interceptor.submitReservation(request5));
}
@Test
public void testSubmitReservationMultipleSubmission() throws Exception {
LOG.info("Test FederationClientInterceptor: Submit Reservation - Multiple");
// get new reservationId
GetNewReservationRequest request = GetNewReservationRequest.newInstance();
GetNewReservationResponse response = interceptor.getNewReservation(request);
Assert.assertNotNull(response);
// allow plan follower to synchronize, manually trigger an assignment
Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
for (MockRM mockRM : mockRMs.values()) {
ReservationSystem reservationSystem = mockRM.getReservationSystem();
reservationSystem.synchronizePlan("root.decided", true);
}
// First Submit Reservation
ReservationId reservationId = response.getReservationId();
ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance(
rDefinition, "decided", reservationId);
ReservationSubmissionResponse submissionResponse =
interceptor.submitReservation(rSubmissionRequest);
Assert.assertNotNull(submissionResponse);
SubClusterId subClusterId1 = stateStoreUtil.queryReservationHomeSC(reservationId);
Assert.assertNotNull(subClusterId1);
Assert.assertTrue(subClusters.contains(subClusterId1));
// First Retry
ReservationSubmissionResponse submissionResponse1 =
interceptor.submitReservation(rSubmissionRequest);
Assert.assertNotNull(submissionResponse1);
SubClusterId subClusterId2 = stateStoreUtil.queryReservationHomeSC(reservationId);
Assert.assertNotNull(subClusterId2);
Assert.assertEquals(subClusterId1, subClusterId2);
}
@Test
public void testUpdateReservation() throws Exception {
LOG.info("Test FederationClientInterceptor : UpdateReservation request.");
// get new reservationId
GetNewReservationRequest request = GetNewReservationRequest.newInstance();
GetNewReservationResponse response = interceptor.getNewReservation(request);
Assert.assertNotNull(response);
// allow plan follower to synchronize, manually trigger an assignment
Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
for (MockRM mockRM : mockRMs.values()) {
ReservationSystem reservationSystem = mockRM.getReservationSystem();
reservationSystem.synchronizePlan("root.decided", true);
}
// Submit Reservation
ReservationId reservationId = response.getReservationId();
ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance(
rDefinition, "decided", reservationId);
ReservationSubmissionResponse submissionResponse =
interceptor.submitReservation(rSubmissionRequest);
Assert.assertNotNull(submissionResponse);
// Update Reservation
ReservationDefinition rDefinition2 = createReservationDefinition(2048, 1);
ReservationUpdateRequest updateRequest =
ReservationUpdateRequest.newInstance(rDefinition2, reservationId);
ReservationUpdateResponse updateResponse =
interceptor.updateReservation(updateRequest);
Assert.assertNotNull(updateResponse);
SubClusterId subClusterId = stateStoreUtil.queryReservationHomeSC(reservationId);
Assert.assertNotNull(subClusterId);
}
@Test
public void testDeleteReservation() throws Exception {
LOG.info("Test FederationClientInterceptor : DeleteReservation request.");
// get new reservationId
GetNewReservationRequest request = GetNewReservationRequest.newInstance();
GetNewReservationResponse response = interceptor.getNewReservation(request);
Assert.assertNotNull(response);
// allow plan follower to synchronize, manually trigger an assignment
Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
for (MockRM mockRM : mockRMs.values()) {
ReservationSystem reservationSystem = mockRM.getReservationSystem();
reservationSystem.synchronizePlan("root.decided", true);
}
// Submit Reservation
ReservationId reservationId = response.getReservationId();
ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
ReservationSubmissionRequest rSubmissionRequest = ReservationSubmissionRequest.newInstance(
rDefinition, "decided", reservationId);
ReservationSubmissionResponse submissionResponse =
interceptor.submitReservation(rSubmissionRequest);
Assert.assertNotNull(submissionResponse);
// Delete Reservation
ReservationDeleteRequest deleteRequest = ReservationDeleteRequest.newInstance(reservationId);
ReservationDeleteResponse deleteResponse = interceptor.deleteReservation(deleteRequest);
Assert.assertNotNull(deleteResponse);
LambdaTestUtils.intercept(YarnException.class,
"Reservation " + reservationId + " does not exist",
() -> stateStoreUtil.queryReservationHomeSC(reservationId));
}
private ReservationDefinition createReservationDefinition(int memory, int core) {
// get reservationId
long arrival = Time.now();
long deadline = arrival + (int)(DEFAULT_DURATION * 1.1);
ReservationRequest rRequest = ReservationRequest.newInstance(
Resource.newInstance(memory, core), 1, 1, DEFAULT_DURATION);
ReservationRequest[] rRequests = new ReservationRequest[] {rRequest};
ReservationDefinition rDefinition = createReservationDefinition(arrival, deadline, rRequests,
ReservationRequestInterpreter.R_ALL, "u1");
return rDefinition;
}
/**
* This method is used to create a ReservationDefinition.
*
* @param arrival Job arrival time
* @param deadline Job deadline
* @param reservationRequests reservationRequest Array
* @param rType Enumeration of various types of
* dependencies among multiple ReservationRequest
* @param username username
* @return ReservationDefinition
*/
private ReservationDefinition createReservationDefinition(long arrival,
long deadline, ReservationRequest[] reservationRequests,
ReservationRequestInterpreter rType, String username) {
ReservationRequests requests = ReservationRequests
.newInstance(Arrays.asList(reservationRequests), rType);
return ReservationDefinition.newInstance(arrival, deadline,
requests, username, "0", Priority.UNDEFINED);
}
}

View File

@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -106,6 +107,8 @@ public abstract class BaseRouterRMAdminTest {
this.dispatcher.init(conf);
this.dispatcher.start();
this.rmAdminService = createAndStartRouterRMAdminService();
DefaultMetricsSystem.setMiniClusterMode(true);
}
@After

View File

@ -53,7 +53,7 @@
<property>
<name>yarn.scheduler.capacity.root.default.capacity</name>
<value>90</value>
<value>50</value>
<description>Default queue target capacity.</description>
</property>
@ -63,6 +63,12 @@
<description>target queue capacity.</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.decided.capacity</name>
<value>40</value>
<description>decided queue capacity.</description>
</property>
<property>
<name>yarn.scheduler.capacity.root.default.user-limit-factor</name>
<value>1</value>