YARN-2712. TestWorkPreservingRMRestart: Augment FS tests with queue and headroom checks. (Tsuyoshi Ozawa via kasha)
This commit is contained in:
parent
0126cf16b7
commit
179cab81e0
|
@ -56,6 +56,9 @@ Release 2.7.0 - UNRELEASED
|
|||
YARN-2742. FairSchedulerConfiguration should allow extra spaces
|
||||
between value and unit. (Wei Yan via kasha)
|
||||
|
||||
YARN-2712. TestWorkPreservingRMRestart: Augment FS tests with
|
||||
queue and headroom checks. (Tsuyoshi Ozawa via kasha)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -305,6 +305,7 @@ public class FairScheduler extends
|
|||
// Recursively compute fair shares for all queues
|
||||
// and update metrics
|
||||
rootQueue.recomputeShares();
|
||||
updateRootQueueMetrics();
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (--updatesToSkipForDebug < 0) {
|
||||
|
|
|
@ -18,8 +18,10 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
@ -47,6 +49,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
|
@ -65,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
|
@ -148,6 +154,9 @@ public class TestWorkPreservingRMRestart {
|
|||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
if (schedulerClass.equals(FairScheduler.class)) {
|
||||
initFairScheduler(rm1);
|
||||
}
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||
|
@ -160,6 +169,9 @@ public class TestWorkPreservingRMRestart {
|
|||
|
||||
// Re-start RM
|
||||
rm2 = new MockRM(conf, memStore);
|
||||
if (schedulerClass.equals(FairScheduler.class)) {
|
||||
initFairScheduler(rm2);
|
||||
}
|
||||
rm2.start();
|
||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||
// recover app
|
||||
|
@ -227,7 +239,9 @@ public class TestWorkPreservingRMRestart {
|
|||
if (schedulerClass.equals(CapacityScheduler.class)) {
|
||||
checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
|
||||
} else if (schedulerClass.equals(FifoScheduler.class)) {
|
||||
checkFifoQueue(schedulerApp, usedResources, availableResources);
|
||||
checkFifoQueue(rm2, schedulerApp, usedResources, availableResources);
|
||||
} else if (schedulerClass.equals(FairScheduler.class)) {
|
||||
checkFSQueue(rm2, schedulerApp, usedResources, availableResources);
|
||||
}
|
||||
|
||||
// *********** check scheduler attempt state.********
|
||||
|
@ -239,11 +253,6 @@ public class TestWorkPreservingRMRestart {
|
|||
scheduler.getRMContainer(runningContainer.getContainerId())));
|
||||
assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources);
|
||||
|
||||
// Until YARN-1959 is resolved
|
||||
if (scheduler.getClass() != FairScheduler.class) {
|
||||
assertEquals(availableResources, schedulerAttempt.getHeadroom());
|
||||
}
|
||||
|
||||
// *********** check appSchedulingInfo state ***********
|
||||
assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId());
|
||||
}
|
||||
|
@ -253,21 +262,26 @@ public class TestWorkPreservingRMRestart {
|
|||
Resource clusterResource, Resource queueResource, Resource usedResource,
|
||||
int numContainers)
|
||||
throws Exception {
|
||||
checkCSLeafQueue(rm2, app, clusterResource, queueResource, usedResource,
|
||||
checkCSLeafQueue(rm, app, clusterResource, queueResource, usedResource,
|
||||
numContainers);
|
||||
|
||||
LeafQueue queue = (LeafQueue) app.getQueue();
|
||||
Resource availableResources = Resources.subtract(queueResource, usedResource);
|
||||
Resource availableResources =
|
||||
Resources.subtract(queueResource, usedResource);
|
||||
// ************ check app headroom ****************
|
||||
SchedulerApplicationAttempt schedulerAttempt = app.getCurrentAppAttempt();
|
||||
assertEquals(availableResources, schedulerAttempt.getHeadroom());
|
||||
|
||||
// ************* check Queue metrics ************
|
||||
QueueMetrics queueMetrics = queue.getMetrics();
|
||||
asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
|
||||
assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
|
||||
availableResources.getVirtualCores(), usedResource.getMemory(),
|
||||
usedResource.getVirtualCores());
|
||||
|
||||
// ************ check user metrics ***********
|
||||
QueueMetrics userMetrics =
|
||||
queueMetrics.getUserMetrics(app.getUser());
|
||||
asserteMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
|
||||
assertMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
|
||||
availableResources.getVirtualCores(), usedResource.getMemory(),
|
||||
usedResource.getVirtualCores());
|
||||
}
|
||||
|
@ -297,9 +311,10 @@ public class TestWorkPreservingRMRestart {
|
|||
.getTotalConsumedResources());
|
||||
}
|
||||
|
||||
private void checkFifoQueue(SchedulerApplication schedulerApp,
|
||||
Resource usedResources, Resource availableResources) throws Exception {
|
||||
FifoScheduler scheduler = (FifoScheduler) rm2.getResourceScheduler();
|
||||
private void checkFifoQueue(ResourceManager rm,
|
||||
SchedulerApplication schedulerApp, Resource usedResources,
|
||||
Resource availableResources) throws Exception {
|
||||
FifoScheduler scheduler = (FifoScheduler) rm.getResourceScheduler();
|
||||
// ************ check cluster used Resources ********
|
||||
assertEquals(usedResources, scheduler.getUsedResource());
|
||||
|
||||
|
@ -310,11 +325,70 @@ public class TestWorkPreservingRMRestart {
|
|||
|
||||
// ************ check queue metrics ****************
|
||||
QueueMetrics queueMetrics = scheduler.getRootQueueMetrics();
|
||||
asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
|
||||
assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
|
||||
availableResources.getVirtualCores(), usedResources.getMemory(),
|
||||
usedResources.getVirtualCores());
|
||||
}
|
||||
|
||||
private void checkFSQueue(ResourceManager rm,
|
||||
SchedulerApplication schedulerApp, Resource usedResources,
|
||||
Resource availableResources) throws Exception {
|
||||
// waiting for RM's scheduling apps
|
||||
int retry = 0;
|
||||
Resource assumedFairShare = Resource.newInstance(8192, 8);
|
||||
while (true) {
|
||||
Thread.sleep(100);
|
||||
if (assumedFairShare.equals(((FairScheduler)rm.getResourceScheduler())
|
||||
.getQueueManager().getRootQueue().getFairShare())) {
|
||||
break;
|
||||
}
|
||||
retry++;
|
||||
if (retry > 30) {
|
||||
Assert.fail("Apps are not scheduled within assumed timeout");
|
||||
}
|
||||
}
|
||||
|
||||
FairScheduler scheduler = (FairScheduler) rm.getResourceScheduler();
|
||||
FSParentQueue root = scheduler.getQueueManager().getRootQueue();
|
||||
// ************ check cluster used Resources ********
|
||||
assertTrue(root.getPolicy() instanceof DominantResourceFairnessPolicy);
|
||||
assertEquals(usedResources,root.getResourceUsage());
|
||||
|
||||
// ************ check app headroom ****************
|
||||
FSAppAttempt schedulerAttempt =
|
||||
(FSAppAttempt) schedulerApp.getCurrentAppAttempt();
|
||||
assertEquals(availableResources, schedulerAttempt.getHeadroom());
|
||||
|
||||
// ************ check queue metrics ****************
|
||||
QueueMetrics queueMetrics = scheduler.getRootQueueMetrics();
|
||||
assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
|
||||
availableResources.getVirtualCores(), usedResources.getMemory(),
|
||||
usedResources.getVirtualCores());
|
||||
}
|
||||
|
||||
private void initFairScheduler(ResourceManager rm) throws IOException {
|
||||
FairScheduler scheduler = (FairScheduler) rm.getResourceScheduler();
|
||||
String testDir =
|
||||
new File(
|
||||
System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
|
||||
String allocFile = new File(testDir, "test-queues").getAbsolutePath();
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile);
|
||||
|
||||
PrintWriter out = new PrintWriter(new FileWriter(allocFile));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>");
|
||||
out.println("<queue name=\"root\">");
|
||||
out.println(" <schedulingPolicy>drf</schedulingPolicy>");
|
||||
out.println(" <weight>1.0</weight>");
|
||||
out.println(" <fairSharePreemptionTimeout>100</fairSharePreemptionTimeout>");
|
||||
out.println(" <minSharePreemptionTimeout>120</minSharePreemptionTimeout>");
|
||||
out.println(" <fairSharePreemptionThreshold>.5</fairSharePreemptionThreshold>");
|
||||
out.println("</queue>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
}
|
||||
|
||||
// create 3 container reports for AM
|
||||
public static List<NMContainerStatus>
|
||||
createNMContainerStatusForApp(MockAM am) {
|
||||
|
@ -462,9 +536,10 @@ public class TestWorkPreservingRMRestart {
|
|||
checkCSLeafQueue(rm2, schedulerApp1_1, clusterResource, q1Resource,
|
||||
q1UsedResource, 4);
|
||||
QueueMetrics queue1Metrics = schedulerApp1_1.getQueue().getMetrics();
|
||||
asserteMetrics(queue1Metrics, 2, 0, 2, 0, 4,
|
||||
q1availableResources.getMemory(), q1availableResources.getVirtualCores(),
|
||||
q1UsedResource.getMemory(), q1UsedResource.getVirtualCores());
|
||||
assertMetrics(queue1Metrics, 2, 0, 2, 0, 4,
|
||||
q1availableResources.getMemory(),
|
||||
q1availableResources.getVirtualCores(), q1UsedResource.getMemory(),
|
||||
q1UsedResource.getVirtualCores());
|
||||
|
||||
// assert queue B state.
|
||||
SchedulerApplication schedulerApp2 =
|
||||
|
@ -472,16 +547,17 @@ public class TestWorkPreservingRMRestart {
|
|||
checkCSLeafQueue(rm2, schedulerApp2, clusterResource, q2Resource,
|
||||
q2UsedResource, 2);
|
||||
QueueMetrics queue2Metrics = schedulerApp2.getQueue().getMetrics();
|
||||
asserteMetrics(queue2Metrics, 1, 0, 1, 0, 2,
|
||||
q2availableResources.getMemory(), q2availableResources.getVirtualCores(),
|
||||
q2UsedResource.getMemory(), q2UsedResource.getVirtualCores());
|
||||
assertMetrics(queue2Metrics, 1, 0, 1, 0, 2,
|
||||
q2availableResources.getMemory(),
|
||||
q2availableResources.getVirtualCores(), q2UsedResource.getMemory(),
|
||||
q2UsedResource.getVirtualCores());
|
||||
|
||||
// assert parent queue state.
|
||||
LeafQueue leafQueue = (LeafQueue) schedulerApp2.getQueue();
|
||||
ParentQueue parentQueue = (ParentQueue) leafQueue.getParent();
|
||||
checkParentQueue(parentQueue, 6, totalUsedResource, (float) 6 / 16,
|
||||
(float) 6 / 16);
|
||||
asserteMetrics(parentQueue.getMetrics(), 3, 0, 3, 0, 6,
|
||||
assertMetrics(parentQueue.getMetrics(), 3, 0, 3, 0, 6,
|
||||
totalAvailableResource.getMemory(),
|
||||
totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(),
|
||||
totalUsedResource.getVirtualCores());
|
||||
|
@ -818,7 +894,7 @@ public class TestWorkPreservingRMRestart {
|
|||
}, 1000, 20000);
|
||||
}
|
||||
|
||||
private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
|
||||
private void assertMetrics(QueueMetrics qm, int appsSubmitted,
|
||||
int appsPending, int appsRunning, int appsCompleted,
|
||||
int allocatedContainers, int availableMB, int availableVirtualCores,
|
||||
int allocatedMB, int allocatedVirtualCores) {
|
||||
|
|
Loading…
Reference in New Issue