YARN-3738. Add support for recovery of reserved apps running under dynamic queues (subru via asuresh)
(cherry picked from commit ab8eb8770c
)
This commit is contained in:
parent
fc5a433adb
commit
f0580dcac0
|
@ -485,6 +485,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-4296. DistributedShell Log.info is not friendly.
|
||||
(Xiaowei Wang via stevel)
|
||||
|
||||
YARN-3738. Add support for recovery of reserved apps running under dynamic
|
||||
queues (subru via asuresh)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||
|
|
|
@ -1320,10 +1320,9 @@ public class CapacityScheduler extends
|
|||
case APP_ADDED:
|
||||
{
|
||||
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
|
||||
String queueName =
|
||||
resolveReservationQueueName(appAddedEvent.getQueue(),
|
||||
appAddedEvent.getApplicationId(),
|
||||
appAddedEvent.getReservationID());
|
||||
String queueName = resolveReservationQueueName(appAddedEvent.getQueue(),
|
||||
appAddedEvent.getApplicationId(), appAddedEvent.getReservationID(),
|
||||
appAddedEvent.getIsAppRecovering());
|
||||
if (queueName != null) {
|
||||
if (!appAddedEvent.getIsAppRecovering()) {
|
||||
addApplication(appAddedEvent.getApplicationId(), queueName,
|
||||
|
@ -1664,8 +1663,13 @@ public class CapacityScheduler extends
|
|||
}
|
||||
}
|
||||
|
||||
private String getDefaultReservationQueueName(String planQueueName) {
|
||||
return planQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
|
||||
}
|
||||
|
||||
private synchronized String resolveReservationQueueName(String queueName,
|
||||
ApplicationId applicationId, ReservationId reservationID) {
|
||||
ApplicationId applicationId, ReservationId reservationID,
|
||||
boolean isRecovering) {
|
||||
CSQueue queue = getQueue(queueName);
|
||||
// Check if the queue is a plan queue
|
||||
if ((queue == null) || !(queue instanceof PlanQueue)) {
|
||||
|
@ -1675,10 +1679,15 @@ public class CapacityScheduler extends
|
|||
String resQName = reservationID.toString();
|
||||
queue = getQueue(resQName);
|
||||
if (queue == null) {
|
||||
// reservation has terminated during failover
|
||||
if (isRecovering
|
||||
&& conf.getMoveOnExpiry(getQueue(queueName).getQueuePath())) {
|
||||
// move to the default child queue of the plan
|
||||
return getDefaultReservationQueueName(queueName);
|
||||
}
|
||||
String message =
|
||||
"Application "
|
||||
+ applicationId
|
||||
+ " submitted to a reservation which is not yet currently active: "
|
||||
"Application " + applicationId
|
||||
+ " submitted to a reservation which is not currently active: "
|
||||
+ resQName;
|
||||
this.rmContext.getDispatcher().getEventHandler()
|
||||
.handle(new RMAppEvent(applicationId,
|
||||
|
@ -1699,7 +1708,7 @@ public class CapacityScheduler extends
|
|||
queueName = resQName;
|
||||
} else {
|
||||
// use the default child queue of the plan for unreserved apps
|
||||
queueName = queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
|
||||
queueName = getDefaultReservationQueueName(queueName);
|
||||
}
|
||||
return queueName;
|
||||
}
|
||||
|
|
|
@ -1244,7 +1244,8 @@ public class FairScheduler extends
|
|||
String queueName =
|
||||
resolveReservationQueueName(appAddedEvent.getQueue(),
|
||||
appAddedEvent.getApplicationId(),
|
||||
appAddedEvent.getReservationID());
|
||||
appAddedEvent.getReservationID(),
|
||||
appAddedEvent.getIsAppRecovering());
|
||||
if (queueName != null) {
|
||||
addApplication(appAddedEvent.getApplicationId(),
|
||||
queueName, appAddedEvent.getUser(),
|
||||
|
@ -1317,7 +1318,8 @@ public class FairScheduler extends
|
|||
}
|
||||
|
||||
private synchronized String resolveReservationQueueName(String queueName,
|
||||
ApplicationId applicationId, ReservationId reservationID) {
|
||||
ApplicationId applicationId, ReservationId reservationID,
|
||||
boolean isRecovering) {
|
||||
FSQueue queue = queueMgr.getQueue(queueName);
|
||||
if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) {
|
||||
return queueName;
|
||||
|
@ -1328,6 +1330,11 @@ public class FairScheduler extends
|
|||
String resQName = queueName + "." + reservationID.toString();
|
||||
queue = queueMgr.getQueue(resQName);
|
||||
if (queue == null) {
|
||||
// reservation has terminated during failover
|
||||
if (isRecovering && allocConf.getMoveOnExpiry(queueName)) {
|
||||
// move to the default child queue of the plan
|
||||
return getDefaultQueueForPlanQueue(queueName);
|
||||
}
|
||||
String message =
|
||||
"Application "
|
||||
+ applicationId
|
||||
|
|
|
@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
|
|||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -32,7 +33,9 @@ import java.util.Arrays;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
|
@ -57,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityM
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
|
@ -66,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||
|
@ -77,6 +82,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQu
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
|
@ -94,8 +101,6 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.mortbay.log.Log;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
|
||||
|
@ -132,6 +137,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||
if (rm2 != null) {
|
||||
rm2.stop();
|
||||
}
|
||||
conf = null;
|
||||
}
|
||||
|
||||
// Test common scheduler state including SchedulerAttempt, SchedulerNode,
|
||||
|
@ -257,6 +263,152 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||
assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId());
|
||||
}
|
||||
|
||||
private Configuration getSchedulerDynamicConfiguration() throws IOException {
|
||||
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
|
||||
conf.setTimeDuration(
|
||||
YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP, 1L,
|
||||
TimeUnit.SECONDS);
|
||||
if (getSchedulerType() == SchedulerType.CAPACITY) {
|
||||
CapacitySchedulerConfiguration schedulerConf =
|
||||
new CapacitySchedulerConfiguration(conf);
|
||||
ReservationSystemTestUtil.setupDynamicQueueConfiguration(schedulerConf);
|
||||
return schedulerConf;
|
||||
} else {
|
||||
String allocFile = new File(FairSchedulerTestBase.TEST_DIR,
|
||||
TestWorkPreservingRMRestart.class.getSimpleName() + ".xml")
|
||||
.getAbsolutePath();
|
||||
ReservationSystemTestUtil.setupFSAllocationFile(allocFile);
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile);
|
||||
return conf;
|
||||
}
|
||||
}
|
||||
|
||||
// Test work preserving recovery of apps running under reservation.
|
||||
// This involves:
|
||||
// 1. Setting up a dynamic reservable queue,
|
||||
// 2. Submitting an app to it,
|
||||
// 3. Failing over RM,
|
||||
// 4. Validating that the app is recovered post failover,
|
||||
// 5. Check if all running containers are recovered,
|
||||
// 6. Verify the scheduler state like attempt info,
|
||||
// 7. Verify the queue/user metrics for the dynamic reservable queue.
|
||||
@Test(timeout = 30000)
|
||||
public void testDynamicQueueRecovery() throws Exception {
|
||||
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
|
||||
conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
|
||||
DominantResourceCalculator.class.getName());
|
||||
|
||||
// 1. Set up dynamic reservable queue.
|
||||
Configuration schedulerConf = getSchedulerDynamicConfiguration();
|
||||
int containerMemory = 1024;
|
||||
Resource containerResource = Resource.newInstance(containerMemory, 1);
|
||||
|
||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(schedulerConf);
|
||||
rm1 = new MockRM(schedulerConf, memStore);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
// 2. Run plan follower to update the added node & then submit app to
|
||||
// dynamic queue.
|
||||
rm1.getRMContext().getReservationSystem()
|
||||
.synchronizePlan(ReservationSystemTestUtil.reservationQ, true);
|
||||
RMApp app1 = rm1.submitApp(200, "dynamicQApp",
|
||||
UserGroupInformation.getCurrentUser().getShortUserName(), null,
|
||||
ReservationSystemTestUtil.getReservationQueueName());
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||
|
||||
// clear queue metrics
|
||||
rm1.clearQueueMetrics(app1);
|
||||
|
||||
// 3. Fail over (restart) RM.
|
||||
rm2 = new MockRM(schedulerConf, memStore);
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
// 4. Validate app is recovered post failover.
|
||||
RMApp recoveredApp1 =
|
||||
rm2.getRMContext().getRMApps().get(app1.getApplicationId());
|
||||
RMAppAttempt loadedAttempt1 = recoveredApp1.getCurrentAppAttempt();
|
||||
NMContainerStatus amContainer = TestRMRestart.createNMContainerStatus(
|
||||
am1.getApplicationAttemptId(), 1, ContainerState.RUNNING);
|
||||
NMContainerStatus runningContainer = TestRMRestart.createNMContainerStatus(
|
||||
am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
|
||||
NMContainerStatus completedContainer =
|
||||
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
|
||||
ContainerState.COMPLETE);
|
||||
|
||||
nm1.registerNode(
|
||||
Arrays.asList(amContainer, runningContainer, completedContainer), null);
|
||||
|
||||
// Wait for RM to settle down on recovering containers.
|
||||
waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
|
||||
Set<ContainerId> launchedContainers =
|
||||
((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId()))
|
||||
.getLaunchedContainers();
|
||||
assertTrue(launchedContainers.contains(amContainer.getContainerId()));
|
||||
assertTrue(launchedContainers.contains(runningContainer.getContainerId()));
|
||||
|
||||
// 5. Check RMContainers are re-recreated and the container state is
|
||||
// correct.
|
||||
rm2.waitForState(nm1, amContainer.getContainerId(),
|
||||
RMContainerState.RUNNING);
|
||||
rm2.waitForState(nm1, runningContainer.getContainerId(),
|
||||
RMContainerState.RUNNING);
|
||||
rm2.waitForContainerToComplete(loadedAttempt1, completedContainer);
|
||||
|
||||
AbstractYarnScheduler scheduler =
|
||||
(AbstractYarnScheduler) rm2.getResourceScheduler();
|
||||
SchedulerNode schedulerNode1 = scheduler.getSchedulerNode(nm1.getNodeId());
|
||||
|
||||
// ********* check scheduler node state.*******
|
||||
// 2 running containers.
|
||||
Resource usedResources = Resources.multiply(containerResource, 2);
|
||||
Resource nmResource =
|
||||
Resource.newInstance(nm1.getMemory(), nm1.getvCores());
|
||||
|
||||
assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
|
||||
assertTrue(
|
||||
schedulerNode1.isValidContainer(runningContainer.getContainerId()));
|
||||
assertFalse(
|
||||
schedulerNode1.isValidContainer(completedContainer.getContainerId()));
|
||||
// 2 launched containers, 1 completed container
|
||||
assertEquals(2, schedulerNode1.getNumContainers());
|
||||
|
||||
assertEquals(Resources.subtract(nmResource, usedResources),
|
||||
schedulerNode1.getAvailableResource());
|
||||
assertEquals(usedResources, schedulerNode1.getUsedResource());
|
||||
Resource availableResources = Resources.subtract(nmResource, usedResources);
|
||||
|
||||
// 6. Verify the scheduler state like attempt info.
|
||||
Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> sa =
|
||||
((AbstractYarnScheduler) rm2.getResourceScheduler())
|
||||
.getSchedulerApplications();
|
||||
SchedulerApplication<SchedulerApplicationAttempt> schedulerApp =
|
||||
sa.get(recoveredApp1.getApplicationId());
|
||||
|
||||
// 7. Verify the queue/user metrics for the dynamic reservable queue.
|
||||
if (getSchedulerType() == SchedulerType.CAPACITY) {
|
||||
checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
|
||||
} else {
|
||||
checkFSQueue(rm2, schedulerApp, usedResources, availableResources);
|
||||
}
|
||||
|
||||
// *********** check scheduler attempt state.********
|
||||
SchedulerApplicationAttempt schedulerAttempt =
|
||||
schedulerApp.getCurrentAppAttempt();
|
||||
assertTrue(schedulerAttempt.getLiveContainers()
|
||||
.contains(scheduler.getRMContainer(amContainer.getContainerId())));
|
||||
assertTrue(schedulerAttempt.getLiveContainers()
|
||||
.contains(scheduler.getRMContainer(runningContainer.getContainerId())));
|
||||
assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources);
|
||||
|
||||
// *********** check appSchedulingInfo state ***********
|
||||
assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId());
|
||||
}
|
||||
|
||||
private void checkCSQueue(MockRM rm,
|
||||
SchedulerApplication<SchedulerApplicationAttempt> app,
|
||||
Resource clusterResource, Resource queueResource, Resource usedResource,
|
||||
|
|
|
@ -304,6 +304,18 @@ public class ReservationSystemTestUtil {
|
|||
conf.setCapacity(A2, 70);
|
||||
}
|
||||
|
||||
public static void setupDynamicQueueConfiguration(
|
||||
CapacitySchedulerConfiguration conf) {
|
||||
// Define top-level queues
|
||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||
new String[] { reservationQ });
|
||||
final String dedicated = CapacitySchedulerConfiguration.ROOT
|
||||
+ CapacitySchedulerConfiguration.DOT + reservationQ;
|
||||
conf.setCapacity(dedicated, 100);
|
||||
// Set as reservation queue
|
||||
conf.setReservable(dedicated, true);
|
||||
}
|
||||
|
||||
public static String getFullReservationQueueName() {
|
||||
return CapacitySchedulerConfiguration.ROOT
|
||||
+ CapacitySchedulerConfiguration.DOT + reservationQ;
|
||||
|
|
Loading…
Reference in New Issue