YARN-3738. Add support for recovery of reserved apps running under dynamic queues (subru via asuresh)

This commit is contained in:
Arun Suresh 2015-10-24 22:53:10 -07:00
parent 446212a39e
commit ab8eb8770c
5 changed files with 196 additions and 13 deletions

View File

@ -537,6 +537,9 @@ Release 2.8.0 - UNRELEASED
YARN-4296. DistributedShell Log.info is not friendly.
(Xiaowei Wang via stevel)
YARN-3738. Add support for recovery of reserved apps running under dynamic
queues (subru via asuresh)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -1320,10 +1320,9 @@ public class CapacityScheduler extends
case APP_ADDED:
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
String queueName =
resolveReservationQueueName(appAddedEvent.getQueue(),
appAddedEvent.getApplicationId(),
appAddedEvent.getReservationID());
String queueName = resolveReservationQueueName(appAddedEvent.getQueue(),
appAddedEvent.getApplicationId(), appAddedEvent.getReservationID(),
appAddedEvent.getIsAppRecovering());
if (queueName != null) {
if (!appAddedEvent.getIsAppRecovering()) {
addApplication(appAddedEvent.getApplicationId(), queueName,
@ -1664,8 +1663,13 @@ public class CapacityScheduler extends
}
}
private String getDefaultReservationQueueName(String planQueueName) {
return planQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
}
private synchronized String resolveReservationQueueName(String queueName,
ApplicationId applicationId, ReservationId reservationID) {
ApplicationId applicationId, ReservationId reservationID,
boolean isRecovering) {
CSQueue queue = getQueue(queueName);
// Check if the queue is a plan queue
if ((queue == null) || !(queue instanceof PlanQueue)) {
@ -1675,10 +1679,15 @@ public class CapacityScheduler extends
String resQName = reservationID.toString();
queue = getQueue(resQName);
if (queue == null) {
// reservation has terminated during failover
if (isRecovering
&& conf.getMoveOnExpiry(getQueue(queueName).getQueuePath())) {
// move to the default child queue of the plan
return getDefaultReservationQueueName(queueName);
}
String message =
"Application "
+ applicationId
+ " submitted to a reservation which is not yet currently active: "
"Application " + applicationId
+ " submitted to a reservation which is not currently active: "
+ resQName;
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId,
@ -1699,7 +1708,7 @@ public class CapacityScheduler extends
queueName = resQName;
} else {
// use the default child queue of the plan for unreserved apps
queueName = queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
queueName = getDefaultReservationQueueName(queueName);
}
return queueName;
}

View File

@ -1244,7 +1244,8 @@ public class FairScheduler extends
String queueName =
resolveReservationQueueName(appAddedEvent.getQueue(),
appAddedEvent.getApplicationId(),
appAddedEvent.getReservationID());
appAddedEvent.getReservationID(),
appAddedEvent.getIsAppRecovering());
if (queueName != null) {
addApplication(appAddedEvent.getApplicationId(),
queueName, appAddedEvent.getUser(),
@ -1317,7 +1318,8 @@ public class FairScheduler extends
}
private synchronized String resolveReservationQueueName(String queueName,
ApplicationId applicationId, ReservationId reservationID) {
ApplicationId applicationId, ReservationId reservationID,
boolean isRecovering) {
FSQueue queue = queueMgr.getQueue(queueName);
if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) {
return queueName;
@ -1328,6 +1330,11 @@ public class FairScheduler extends
String resQName = queueName + "." + reservationID.toString();
queue = queueMgr.getQueue(resQName);
if (queue == null) {
// reservation has terminated during failover
if (isRecovering && allocConf.getMoveOnExpiry(queueName)) {
// move to the default child queue of the plan
return getDefaultQueueForPlanQueue(queueName);
}
String message =
"Application "
+ applicationId

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
@ -32,7 +33,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
@ -57,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityM
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -66,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@ -77,6 +82,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQu
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.util.ControlledClock;
@ -94,8 +101,6 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mortbay.log.Log;
import com.google.common.base.Supplier;
@ -132,6 +137,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
if (rm2 != null) {
rm2.stop();
}
conf = null;
}
// Test common scheduler state including SchedulerAttempt, SchedulerNode,
@ -257,6 +263,152 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId());
}
private Configuration getSchedulerDynamicConfiguration() throws IOException {
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
conf.setTimeDuration(
YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP, 1L,
TimeUnit.SECONDS);
if (getSchedulerType() == SchedulerType.CAPACITY) {
CapacitySchedulerConfiguration schedulerConf =
new CapacitySchedulerConfiguration(conf);
ReservationSystemTestUtil.setupDynamicQueueConfiguration(schedulerConf);
return schedulerConf;
} else {
String allocFile = new File(FairSchedulerTestBase.TEST_DIR,
TestWorkPreservingRMRestart.class.getSimpleName() + ".xml")
.getAbsolutePath();
ReservationSystemTestUtil.setupFSAllocationFile(allocFile);
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile);
return conf;
}
}
// Test work preserving recovery of apps running under reservation.
// This involves:
// 1. Setting up a dynamic reservable queue,
// 2. Submitting an app to it,
// 3. Failing over RM,
// 4. Validating that the app is recovered post failover,
// 5. Check if all running containers are recovered,
// 6. Verify the scheduler state like attempt info,
// 7. Verify the queue/user metrics for the dynamic reservable queue.
@Test(timeout = 30000)
public void testDynamicQueueRecovery() throws Exception {
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class.getName());
// 1. Set up dynamic reservable queue.
Configuration schedulerConf = getSchedulerDynamicConfiguration();
int containerMemory = 1024;
Resource containerResource = Resource.newInstance(containerMemory, 1);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(schedulerConf);
rm1 = new MockRM(schedulerConf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
nm1.registerNode();
// 2. Run plan follower to update the added node & then submit app to
// dynamic queue.
rm1.getRMContext().getReservationSystem()
.synchronizePlan(ReservationSystemTestUtil.reservationQ, true);
RMApp app1 = rm1.submitApp(200, "dynamicQApp",
UserGroupInformation.getCurrentUser().getShortUserName(), null,
ReservationSystemTestUtil.getReservationQueueName());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// clear queue metrics
rm1.clearQueueMetrics(app1);
// 3. Fail over (restart) RM.
rm2 = new MockRM(schedulerConf, memStore);
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
// 4. Validate app is recovered post failover.
RMApp recoveredApp1 =
rm2.getRMContext().getRMApps().get(app1.getApplicationId());
RMAppAttempt loadedAttempt1 = recoveredApp1.getCurrentAppAttempt();
NMContainerStatus amContainer = TestRMRestart.createNMContainerStatus(
am1.getApplicationAttemptId(), 1, ContainerState.RUNNING);
NMContainerStatus runningContainer = TestRMRestart.createNMContainerStatus(
am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
NMContainerStatus completedContainer =
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
ContainerState.COMPLETE);
nm1.registerNode(
Arrays.asList(amContainer, runningContainer, completedContainer), null);
// Wait for RM to settle down on recovering containers.
waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
Set<ContainerId> launchedContainers =
((RMNodeImpl) rm2.getRMContext().getRMNodes().get(nm1.getNodeId()))
.getLaunchedContainers();
assertTrue(launchedContainers.contains(amContainer.getContainerId()));
assertTrue(launchedContainers.contains(runningContainer.getContainerId()));
// 5. Check RMContainers are re-recreated and the container state is
// correct.
rm2.waitForState(nm1, amContainer.getContainerId(),
RMContainerState.RUNNING);
rm2.waitForState(nm1, runningContainer.getContainerId(),
RMContainerState.RUNNING);
rm2.waitForContainerToComplete(loadedAttempt1, completedContainer);
AbstractYarnScheduler scheduler =
(AbstractYarnScheduler) rm2.getResourceScheduler();
SchedulerNode schedulerNode1 = scheduler.getSchedulerNode(nm1.getNodeId());
// ********* check scheduler node state.*******
// 2 running containers.
Resource usedResources = Resources.multiply(containerResource, 2);
Resource nmResource =
Resource.newInstance(nm1.getMemory(), nm1.getvCores());
assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
assertTrue(
schedulerNode1.isValidContainer(runningContainer.getContainerId()));
assertFalse(
schedulerNode1.isValidContainer(completedContainer.getContainerId()));
// 2 launched containers, 1 completed container
assertEquals(2, schedulerNode1.getNumContainers());
assertEquals(Resources.subtract(nmResource, usedResources),
schedulerNode1.getAvailableResource());
assertEquals(usedResources, schedulerNode1.getUsedResource());
Resource availableResources = Resources.subtract(nmResource, usedResources);
// 6. Verify the scheduler state like attempt info.
Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> sa =
((AbstractYarnScheduler) rm2.getResourceScheduler())
.getSchedulerApplications();
SchedulerApplication<SchedulerApplicationAttempt> schedulerApp =
sa.get(recoveredApp1.getApplicationId());
// 7. Verify the queue/user metrics for the dynamic reservable queue.
if (getSchedulerType() == SchedulerType.CAPACITY) {
checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
} else {
checkFSQueue(rm2, schedulerApp, usedResources, availableResources);
}
// *********** check scheduler attempt state.********
SchedulerApplicationAttempt schedulerAttempt =
schedulerApp.getCurrentAppAttempt();
assertTrue(schedulerAttempt.getLiveContainers()
.contains(scheduler.getRMContainer(amContainer.getContainerId())));
assertTrue(schedulerAttempt.getLiveContainers()
.contains(scheduler.getRMContainer(runningContainer.getContainerId())));
assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources);
// *********** check appSchedulingInfo state ***********
assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId());
}
private void checkCSQueue(MockRM rm,
SchedulerApplication<SchedulerApplicationAttempt> app,
Resource clusterResource, Resource queueResource, Resource usedResource,

View File

@ -304,6 +304,18 @@ public class ReservationSystemTestUtil {
conf.setCapacity(A2, 70);
}
public static void setupDynamicQueueConfiguration(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { reservationQ });
final String dedicated = CapacitySchedulerConfiguration.ROOT
+ CapacitySchedulerConfiguration.DOT + reservationQ;
conf.setCapacity(dedicated, 100);
// Set as reservation queue
conf.setReservable(dedicated, true);
}
public static String getFullReservationQueueName() {
return CapacitySchedulerConfiguration.ROOT
+ CapacitySchedulerConfiguration.DOT + reservationQ;