YARN-11297. [Federation] Improve Yarn Router Reservation Submission Code. (#4863)

This commit is contained in:
slfan1989 2022-09-10 01:39:00 +08:00 committed by GitHub
parent 8732625f50
commit e76ffbf102
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 17 deletions

View File

@ -949,7 +949,13 @@ public class FederationClientInterceptor
// Second, determine whether the current ReservationId has a corresponding subCluster.
// If it does not exist, add it. If it exists, update it.
Boolean exists = existsReservationHomeSubCluster(reservationId);
if (!exists) {
// We may encounter the situation of repeated submission of Reservation,
// at this time we should try to use the reservation that has been allocated
// !exists indicates that the reservation does not exist and needs to be added
// i==0, mainly to consider repeated submissions,
// so the first time to apply for reservation, try to use the original reservation
if (!exists || i == 0) {
addReservationHomeSubCluster(reservationId, reservationHomeSubCluster);
} else {
updateReservationHomeSubCluster(subClusterId, reservationId, reservationHomeSubCluster);

View File

@ -1308,13 +1308,6 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
GetNewReservationResponse response = interceptor.getNewReservation(request);
Assert.assertNotNull(response);
// allow plan follower to synchronize, manually trigger an assignment
Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
for (MockRM mockRM : mockRMs.values()) {
ReservationSystem reservationSystem = mockRM.getReservationSystem();
reservationSystem.synchronizePlan("root.decided", true);
}
// Submit Reservation
ReservationId reservationId = response.getReservationId();
ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
@ -1384,13 +1377,6 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
GetNewReservationResponse response = interceptor.getNewReservation(request);
Assert.assertNotNull(response);
// allow plan follower to synchronize, manually trigger an assignment
Map<SubClusterId, MockRM> mockRMs = interceptor.getMockRMs();
for (MockRM mockRM : mockRMs.values()) {
ReservationSystem reservationSystem = mockRM.getReservationSystem();
reservationSystem.synchronizePlan("root.decided", true);
}
// First Submit Reservation
ReservationId reservationId = response.getReservationId();
ReservationDefinition rDefinition = createReservationDefinition(1024, 1);
@ -1404,10 +1390,12 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
Assert.assertNotNull(subClusterId1);
Assert.assertTrue(subClusters.contains(subClusterId1));
// First Retry
// First Retry, repeat the submission
ReservationSubmissionResponse submissionResponse1 =
interceptor.submitReservation(rSubmissionRequest);
Assert.assertNotNull(submissionResponse1);
// Expect reserved clusters to be consistent
SubClusterId subClusterId2 = stateStoreUtil.queryReservationHomeSC(reservationId);
Assert.assertNotNull(subClusterId2);
Assert.assertEquals(subClusterId1, subClusterId2);

View File

@ -28,13 +28,16 @@ import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@ -43,6 +46,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
@ -95,6 +100,7 @@ public class TestableFederationClientInterceptor
mockRMs.put(subClusterId, mockRM);
}
initNodeAttributes(subClusterId, mockRM);
initReservationSystem(mockRM);
return mockRM.getClientRMService();
}
}
@ -167,6 +173,23 @@ public class TestableFederationClientInterceptor
}
}
private void initReservationSystem(MockRM mockRM) throws YarnException {
try {
// Ensure that the reserved resources of the RM#Reservation System are allocated
String planName = "root.decided";
ReservationSystem reservationSystem = mockRM.getReservationSystem();
reservationSystem.synchronizePlan(planName, true);
GenericTestUtils.waitFor(() -> {
Plan plan = reservationSystem.getPlan(planName);
Resource resource = plan.getTotalCapacity();
return (resource.getMemorySize() > 0 && resource.getVirtualCores() > 0);
}, 100, 2000);
} catch (TimeoutException | InterruptedException e) {
throw new YarnException(e);
}
}
@Override
public void shutdown() {
if (mockRMs != null && !mockRMs.isEmpty()) {
@ -193,4 +216,4 @@ public class TestableFederationClientInterceptor
mockRMs.clear();
super.shutdown();
}
}
}