YARN-2712. TestWorkPreservingRMRestart: Augment FS tests with queue and headroom checks. (Tsuyoshi Ozawa via kasha)
(cherry picked from commit 179cab81e0
)
This commit is contained in:
parent
8bfef59029
commit
d2ba115f06
|
@ -29,6 +29,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
YARN-2742. FairSchedulerConfiguration should allow extra spaces
|
YARN-2742. FairSchedulerConfiguration should allow extra spaces
|
||||||
between value and unit. (Wei Yan via kasha)
|
between value and unit. (Wei Yan via kasha)
|
||||||
|
|
||||||
|
YARN-2712. TestWorkPreservingRMRestart: Augment FS tests with
|
||||||
|
queue and headroom checks. (Tsuyoshi Ozawa via kasha)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -305,6 +305,7 @@ public class FairScheduler extends
|
||||||
// Recursively compute fair shares for all queues
|
// Recursively compute fair shares for all queues
|
||||||
// and update metrics
|
// and update metrics
|
||||||
rootQueue.recomputeShares();
|
rootQueue.recomputeShares();
|
||||||
|
updateRootQueueMetrics();
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
if (--updatesToSkipForDebug < 0) {
|
if (--updatesToSkipForDebug < 0) {
|
||||||
|
|
|
@ -18,8 +18,10 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
import org.apache.hadoop.security.token.Token;
|
import java.io.File;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import java.io.FileWriter;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.PrintWriter;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
@ -47,6 +49,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
|
@ -65,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||||
import org.apache.hadoop.yarn.util.SystemClock;
|
import org.apache.hadoop.yarn.util.SystemClock;
|
||||||
|
@ -148,6 +154,9 @@ public class TestWorkPreservingRMRestart {
|
||||||
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
memStore.init(conf);
|
memStore.init(conf);
|
||||||
rm1 = new MockRM(conf, memStore);
|
rm1 = new MockRM(conf, memStore);
|
||||||
|
if (schedulerClass.equals(FairScheduler.class)) {
|
||||||
|
initFairScheduler(rm1);
|
||||||
|
}
|
||||||
rm1.start();
|
rm1.start();
|
||||||
MockNM nm1 =
|
MockNM nm1 =
|
||||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||||
|
@ -160,6 +169,9 @@ public class TestWorkPreservingRMRestart {
|
||||||
|
|
||||||
// Re-start RM
|
// Re-start RM
|
||||||
rm2 = new MockRM(conf, memStore);
|
rm2 = new MockRM(conf, memStore);
|
||||||
|
if (schedulerClass.equals(FairScheduler.class)) {
|
||||||
|
initFairScheduler(rm2);
|
||||||
|
}
|
||||||
rm2.start();
|
rm2.start();
|
||||||
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||||
// recover app
|
// recover app
|
||||||
|
@ -227,7 +239,9 @@ public class TestWorkPreservingRMRestart {
|
||||||
if (schedulerClass.equals(CapacityScheduler.class)) {
|
if (schedulerClass.equals(CapacityScheduler.class)) {
|
||||||
checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
|
checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
|
||||||
} else if (schedulerClass.equals(FifoScheduler.class)) {
|
} else if (schedulerClass.equals(FifoScheduler.class)) {
|
||||||
checkFifoQueue(schedulerApp, usedResources, availableResources);
|
checkFifoQueue(rm2, schedulerApp, usedResources, availableResources);
|
||||||
|
} else if (schedulerClass.equals(FairScheduler.class)) {
|
||||||
|
checkFSQueue(rm2, schedulerApp, usedResources, availableResources);
|
||||||
}
|
}
|
||||||
|
|
||||||
// *********** check scheduler attempt state.********
|
// *********** check scheduler attempt state.********
|
||||||
|
@ -239,11 +253,6 @@ public class TestWorkPreservingRMRestart {
|
||||||
scheduler.getRMContainer(runningContainer.getContainerId())));
|
scheduler.getRMContainer(runningContainer.getContainerId())));
|
||||||
assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources);
|
assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources);
|
||||||
|
|
||||||
// Until YARN-1959 is resolved
|
|
||||||
if (scheduler.getClass() != FairScheduler.class) {
|
|
||||||
assertEquals(availableResources, schedulerAttempt.getHeadroom());
|
|
||||||
}
|
|
||||||
|
|
||||||
// *********** check appSchedulingInfo state ***********
|
// *********** check appSchedulingInfo state ***********
|
||||||
assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId());
|
assertEquals((1L << 40) + 1L, schedulerAttempt.getNewContainerId());
|
||||||
}
|
}
|
||||||
|
@ -253,23 +262,28 @@ public class TestWorkPreservingRMRestart {
|
||||||
Resource clusterResource, Resource queueResource, Resource usedResource,
|
Resource clusterResource, Resource queueResource, Resource usedResource,
|
||||||
int numContainers)
|
int numContainers)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
checkCSLeafQueue(rm2, app, clusterResource, queueResource, usedResource,
|
checkCSLeafQueue(rm, app, clusterResource, queueResource, usedResource,
|
||||||
numContainers);
|
numContainers);
|
||||||
|
|
||||||
LeafQueue queue = (LeafQueue) app.getQueue();
|
LeafQueue queue = (LeafQueue) app.getQueue();
|
||||||
Resource availableResources = Resources.subtract(queueResource, usedResource);
|
Resource availableResources =
|
||||||
|
Resources.subtract(queueResource, usedResource);
|
||||||
|
// ************ check app headroom ****************
|
||||||
|
SchedulerApplicationAttempt schedulerAttempt = app.getCurrentAppAttempt();
|
||||||
|
assertEquals(availableResources, schedulerAttempt.getHeadroom());
|
||||||
|
|
||||||
// ************* check Queue metrics ************
|
// ************* check Queue metrics ************
|
||||||
QueueMetrics queueMetrics = queue.getMetrics();
|
QueueMetrics queueMetrics = queue.getMetrics();
|
||||||
asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
|
assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
|
||||||
availableResources.getVirtualCores(), usedResource.getMemory(),
|
availableResources.getVirtualCores(), usedResource.getMemory(),
|
||||||
usedResource.getVirtualCores());
|
usedResource.getVirtualCores());
|
||||||
|
|
||||||
// ************ check user metrics ***********
|
// ************ check user metrics ***********
|
||||||
QueueMetrics userMetrics =
|
QueueMetrics userMetrics =
|
||||||
queueMetrics.getUserMetrics(app.getUser());
|
queueMetrics.getUserMetrics(app.getUser());
|
||||||
asserteMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
|
assertMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
|
||||||
availableResources.getVirtualCores(), usedResource.getMemory(),
|
availableResources.getVirtualCores(), usedResource.getMemory(),
|
||||||
usedResource.getVirtualCores());
|
usedResource.getVirtualCores());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkCSLeafQueue(MockRM rm,
|
private void checkCSLeafQueue(MockRM rm,
|
||||||
|
@ -297,9 +311,10 @@ public class TestWorkPreservingRMRestart {
|
||||||
.getTotalConsumedResources());
|
.getTotalConsumedResources());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkFifoQueue(SchedulerApplication schedulerApp,
|
private void checkFifoQueue(ResourceManager rm,
|
||||||
Resource usedResources, Resource availableResources) throws Exception {
|
SchedulerApplication schedulerApp, Resource usedResources,
|
||||||
FifoScheduler scheduler = (FifoScheduler) rm2.getResourceScheduler();
|
Resource availableResources) throws Exception {
|
||||||
|
FifoScheduler scheduler = (FifoScheduler) rm.getResourceScheduler();
|
||||||
// ************ check cluster used Resources ********
|
// ************ check cluster used Resources ********
|
||||||
assertEquals(usedResources, scheduler.getUsedResource());
|
assertEquals(usedResources, scheduler.getUsedResource());
|
||||||
|
|
||||||
|
@ -310,9 +325,68 @@ public class TestWorkPreservingRMRestart {
|
||||||
|
|
||||||
// ************ check queue metrics ****************
|
// ************ check queue metrics ****************
|
||||||
QueueMetrics queueMetrics = scheduler.getRootQueueMetrics();
|
QueueMetrics queueMetrics = scheduler.getRootQueueMetrics();
|
||||||
asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
|
assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
|
||||||
availableResources.getVirtualCores(), usedResources.getMemory(),
|
availableResources.getVirtualCores(), usedResources.getMemory(),
|
||||||
usedResources.getVirtualCores());
|
usedResources.getVirtualCores());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkFSQueue(ResourceManager rm,
|
||||||
|
SchedulerApplication schedulerApp, Resource usedResources,
|
||||||
|
Resource availableResources) throws Exception {
|
||||||
|
// waiting for RM's scheduling apps
|
||||||
|
int retry = 0;
|
||||||
|
Resource assumedFairShare = Resource.newInstance(8192, 8);
|
||||||
|
while (true) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
if (assumedFairShare.equals(((FairScheduler)rm.getResourceScheduler())
|
||||||
|
.getQueueManager().getRootQueue().getFairShare())) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
retry++;
|
||||||
|
if (retry > 30) {
|
||||||
|
Assert.fail("Apps are not scheduled within assumed timeout");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
FairScheduler scheduler = (FairScheduler) rm.getResourceScheduler();
|
||||||
|
FSParentQueue root = scheduler.getQueueManager().getRootQueue();
|
||||||
|
// ************ check cluster used Resources ********
|
||||||
|
assertTrue(root.getPolicy() instanceof DominantResourceFairnessPolicy);
|
||||||
|
assertEquals(usedResources,root.getResourceUsage());
|
||||||
|
|
||||||
|
// ************ check app headroom ****************
|
||||||
|
FSAppAttempt schedulerAttempt =
|
||||||
|
(FSAppAttempt) schedulerApp.getCurrentAppAttempt();
|
||||||
|
assertEquals(availableResources, schedulerAttempt.getHeadroom());
|
||||||
|
|
||||||
|
// ************ check queue metrics ****************
|
||||||
|
QueueMetrics queueMetrics = scheduler.getRootQueueMetrics();
|
||||||
|
assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
|
||||||
|
availableResources.getVirtualCores(), usedResources.getMemory(),
|
||||||
|
usedResources.getVirtualCores());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initFairScheduler(ResourceManager rm) throws IOException {
|
||||||
|
FairScheduler scheduler = (FairScheduler) rm.getResourceScheduler();
|
||||||
|
String testDir =
|
||||||
|
new File(
|
||||||
|
System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
|
||||||
|
String allocFile = new File(testDir, "test-queues").getAbsolutePath();
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile);
|
||||||
|
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(allocFile));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>");
|
||||||
|
out.println("<queue name=\"root\">");
|
||||||
|
out.println(" <schedulingPolicy>drf</schedulingPolicy>");
|
||||||
|
out.println(" <weight>1.0</weight>");
|
||||||
|
out.println(" <fairSharePreemptionTimeout>100</fairSharePreemptionTimeout>");
|
||||||
|
out.println(" <minSharePreemptionTimeout>120</minSharePreemptionTimeout>");
|
||||||
|
out.println(" <fairSharePreemptionThreshold>.5</fairSharePreemptionThreshold>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
// create 3 container reports for AM
|
// create 3 container reports for AM
|
||||||
|
@ -462,9 +536,10 @@ public class TestWorkPreservingRMRestart {
|
||||||
checkCSLeafQueue(rm2, schedulerApp1_1, clusterResource, q1Resource,
|
checkCSLeafQueue(rm2, schedulerApp1_1, clusterResource, q1Resource,
|
||||||
q1UsedResource, 4);
|
q1UsedResource, 4);
|
||||||
QueueMetrics queue1Metrics = schedulerApp1_1.getQueue().getMetrics();
|
QueueMetrics queue1Metrics = schedulerApp1_1.getQueue().getMetrics();
|
||||||
asserteMetrics(queue1Metrics, 2, 0, 2, 0, 4,
|
assertMetrics(queue1Metrics, 2, 0, 2, 0, 4,
|
||||||
q1availableResources.getMemory(), q1availableResources.getVirtualCores(),
|
q1availableResources.getMemory(),
|
||||||
q1UsedResource.getMemory(), q1UsedResource.getVirtualCores());
|
q1availableResources.getVirtualCores(), q1UsedResource.getMemory(),
|
||||||
|
q1UsedResource.getVirtualCores());
|
||||||
|
|
||||||
// assert queue B state.
|
// assert queue B state.
|
||||||
SchedulerApplication schedulerApp2 =
|
SchedulerApplication schedulerApp2 =
|
||||||
|
@ -472,19 +547,20 @@ public class TestWorkPreservingRMRestart {
|
||||||
checkCSLeafQueue(rm2, schedulerApp2, clusterResource, q2Resource,
|
checkCSLeafQueue(rm2, schedulerApp2, clusterResource, q2Resource,
|
||||||
q2UsedResource, 2);
|
q2UsedResource, 2);
|
||||||
QueueMetrics queue2Metrics = schedulerApp2.getQueue().getMetrics();
|
QueueMetrics queue2Metrics = schedulerApp2.getQueue().getMetrics();
|
||||||
asserteMetrics(queue2Metrics, 1, 0, 1, 0, 2,
|
assertMetrics(queue2Metrics, 1, 0, 1, 0, 2,
|
||||||
q2availableResources.getMemory(), q2availableResources.getVirtualCores(),
|
q2availableResources.getMemory(),
|
||||||
q2UsedResource.getMemory(), q2UsedResource.getVirtualCores());
|
q2availableResources.getVirtualCores(), q2UsedResource.getMemory(),
|
||||||
|
q2UsedResource.getVirtualCores());
|
||||||
|
|
||||||
// assert parent queue state.
|
// assert parent queue state.
|
||||||
LeafQueue leafQueue = (LeafQueue) schedulerApp2.getQueue();
|
LeafQueue leafQueue = (LeafQueue) schedulerApp2.getQueue();
|
||||||
ParentQueue parentQueue = (ParentQueue) leafQueue.getParent();
|
ParentQueue parentQueue = (ParentQueue) leafQueue.getParent();
|
||||||
checkParentQueue(parentQueue, 6, totalUsedResource, (float) 6 / 16,
|
checkParentQueue(parentQueue, 6, totalUsedResource, (float) 6 / 16,
|
||||||
(float) 6 / 16);
|
(float) 6 / 16);
|
||||||
asserteMetrics(parentQueue.getMetrics(), 3, 0, 3, 0, 6,
|
assertMetrics(parentQueue.getMetrics(), 3, 0, 3, 0, 6,
|
||||||
totalAvailableResource.getMemory(),
|
totalAvailableResource.getMemory(),
|
||||||
totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(),
|
totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(),
|
||||||
totalUsedResource.getVirtualCores());
|
totalUsedResource.getVirtualCores());
|
||||||
}
|
}
|
||||||
|
|
||||||
//Test that we receive a meaningful exit-causing exception if a queue
|
//Test that we receive a meaningful exit-causing exception if a queue
|
||||||
|
@ -818,7 +894,7 @@ public class TestWorkPreservingRMRestart {
|
||||||
}, 1000, 20000);
|
}, 1000, 20000);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
|
private void assertMetrics(QueueMetrics qm, int appsSubmitted,
|
||||||
int appsPending, int appsRunning, int appsCompleted,
|
int appsPending, int appsRunning, int appsCompleted,
|
||||||
int allocatedContainers, int availableMB, int availableVirtualCores,
|
int allocatedContainers, int availableMB, int availableVirtualCores,
|
||||||
int allocatedMB, int allocatedVirtualCores) {
|
int allocatedMB, int allocatedVirtualCores) {
|
||||||
|
|
Loading…
Reference in New Issue