YARN-7146. Many RM unit tests failing with FairScheduler (rkanter)

This commit is contained in:
Robert Kanter 2017-09-13 15:44:54 -07:00
parent 4d98936eec
commit bb34ae9554
18 changed files with 296 additions and 187 deletions

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@ -121,6 +122,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
@ -131,14 +133,16 @@ import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
public class TestYarnClient {
@Test
public void test() {
// More to come later.
@Before
public void setup() {
QueueMetrics.clearQueueMetrics();
DefaultMetricsSystem.setMiniClusterMode(true);
}
@Test

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -125,8 +126,20 @@ public abstract class AbstractYarnScheduler
protected SchedulerHealth schedulerHealth = new SchedulerHealth();
protected volatile long lastNodeUpdateTime;
// timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
private volatile Clock clock;
/**
* To enable the update thread, subclasses should set updateInterval to a
* positive value during {@link #serviceInit(Configuration)}.
*/
protected long updateInterval = -1L;
@VisibleForTesting
Thread updateThread;
private final Object updateThreadMonitor = new Object();
/*
* All schedulers which are inheriting AbstractYarnScheduler should use
* concurrent version of 'applications' map.
@ -187,9 +200,35 @@ public abstract class AbstractYarnScheduler
autoUpdateContainers =
conf.getBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS,
YarnConfiguration.DEFAULT_RM_AUTO_UPDATE_CONTAINERS);
if (updateInterval > 0) {
updateThread = new UpdateThread();
updateThread.setName("SchedulerUpdateThread");
updateThread.setUncaughtExceptionHandler(
new RMCriticalThreadUncaughtExceptionHandler(rmContext));
updateThread.setDaemon(true);
}
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
if (updateThread != null) {
updateThread.start();
}
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
if (updateThread != null) {
updateThread.interrupt();
updateThread.join(THREAD_JOIN_TIMEOUT_MS);
}
super.serviceStop();
}
@VisibleForTesting
public ClusterNodeTracker getNodeTracker() {
return nodeTracker;
@ -1353,4 +1392,51 @@ public abstract class AbstractYarnScheduler
public long getMaximumApplicationLifetime(String queueName) {
return -1;
}
/**
* Update internal state of the scheduler. This can be useful for scheduler
* implementations that maintain some state that needs to be periodically
* updated; for example, metrics or queue resources. It will be called by the
* {@link UpdateThread} every {@link #updateInterval}. By default, it will
* not run; subclasses should set {@link #updateInterval} to a
* positive value during {@link #serviceInit(Configuration)} if they want to
* enable the thread.
*/
@VisibleForTesting
public void update() {
// do nothing by default
}
/**
* Thread which calls {@link #update()} every
* <code>updateInterval</code> milliseconds.
*/
private class UpdateThread extends Thread {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
synchronized (updateThreadMonitor) {
updateThreadMonitor.wait(updateInterval);
}
update();
} catch (InterruptedException ie) {
LOG.warn("Scheduler UpdateThread interrupted. Exiting.");
return;
} catch (Exception e) {
LOG.error("Exception in scheduler UpdateThread", e);
}
}
}
}
/**
* Allows {@link UpdateThread} to start processing without waiting till
* {@link #updateInterval}.
*/
protected void triggerUpdate() {
synchronized (updateThreadMonitor) {
updateThreadMonitor.notify();
}
}
}

View File

@ -151,24 +151,14 @@ public class FairScheduler extends
// reserved
public static final Resource CONTAINER_RESERVED = Resources.createResource(-1);
// How often fair shares are re-calculated (ms)
protected long updateInterval;
private final int UPDATE_DEBUG_FREQUENCY = 25;
private int updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
@VisibleForTesting
Thread updateThread;
private final Object updateThreadMonitor = new Object();
@VisibleForTesting
Thread schedulingThread;
Thread preemptionThread;
// timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
// Aggregate metrics
FSQueueMetrics rootMetrics;
FSOpDurations fsOpDurations;
@ -292,40 +282,6 @@ public class FairScheduler extends
return queueMgr;
}
// Allows UpdateThread to start processing without waiting till updateInterval
void triggerUpdate() {
synchronized (updateThreadMonitor) {
updateThreadMonitor.notify();
}
}
/**
* Thread which calls {@link FairScheduler#update()} every
* <code>updateInterval</code> milliseconds.
*/
private class UpdateThread extends Thread {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
synchronized (updateThreadMonitor) {
updateThreadMonitor.wait(updateInterval);
}
long start = getClock().getTime();
update();
long duration = getClock().getTime() - start;
fsOpDurations.addUpdateThreadRunDuration(duration);
} catch (InterruptedException ie) {
LOG.warn("Update thread interrupted. Exiting.");
return;
} catch (Exception e) {
LOG.error("Exception in fair scheduler UpdateThread", e);
}
}
}
}
/**
* Thread which attempts scheduling resources continuously,
* asynchronous to the node heartbeats.
@ -367,7 +323,10 @@ public class FairScheduler extends
* required resources per job.
*/
@VisibleForTesting
@Override
public void update() {
// Storing start time for fsOpDurations
long start = getClock().getTime();
FSQueue rootQueue = queueMgr.getRootQueue();
// Update demands and fairshares
@ -402,6 +361,7 @@ public class FairScheduler extends
} finally {
readLock.unlock();
}
fsOpDurations.addUpdateThreadRunDuration(getClock().getTime() - start);
}
public RMContainerTokenSecretManager
@ -1339,12 +1299,6 @@ public class FairScheduler extends
throw new IOException("Failed to start FairScheduler", e);
}
updateThread = new UpdateThread();
updateThread.setName("FairSchedulerUpdateThread");
updateThread.setUncaughtExceptionHandler(
new RMCriticalThreadUncaughtExceptionHandler(rmContext));
updateThread.setDaemon(true);
if (continuousSchedulingEnabled) {
// start continuous scheduling thread
schedulingThread = new ContinuousSchedulingThread();
@ -1391,9 +1345,7 @@ public class FairScheduler extends
private void startSchedulerThreads() {
try {
writeLock.lock();
Preconditions.checkNotNull(updateThread, "updateThread is null");
Preconditions.checkNotNull(allocsLoader, "allocsLoader is null");
updateThread.start();
if (continuousSchedulingEnabled) {
Preconditions.checkNotNull(schedulingThread,
"schedulingThread is null");
@ -1424,10 +1376,6 @@ public class FairScheduler extends
public void serviceStop() throws Exception {
try {
writeLock.lock();
if (updateThread != null) {
updateThread.interrupt();
updateThread.join(THREAD_JOIN_TIMEOUT_MS);
}
if (continuousSchedulingEnabled) {
if (schedulingThread != null) {
schedulingThread.interrupt();

View File

@ -817,6 +817,8 @@ public class MockRM extends ResourceManager {
RMAppAttemptState.SCHEDULED);
}
((AbstractYarnScheduler)getResourceScheduler()).update();
return rmApp;
}
@ -940,6 +942,7 @@ public class MockRM extends ResourceManager {
public MockAM sendAMLaunched(ApplicationAttemptId appAttemptId)
throws Exception {
MockAM am = new MockAM(getRMContext(), masterService, appAttemptId);
((AbstractYarnScheduler)scheduler).update();
waitForState(appAttemptId, RMAppAttemptState.ALLOCATED);
//create and set AMRMToken
Token<AMRMTokenIdentifier> amrmToken =
@ -1164,6 +1167,7 @@ public class MockRM extends ResourceManager {
RMAppAttempt attempt = waitForAttemptScheduled(app, rm);
LOG.info("Launch AM " + attempt.getAppAttemptId());
nm.nodeHeartbeat(true);
((AbstractYarnScheduler)rm.getResourceScheduler()).update();
rm.drainEventsImplicitly();
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
@ -1179,6 +1183,7 @@ public class MockRM extends ResourceManager {
waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
LOG.info("Launch AM " + attempt.getAppAttemptId());
nm.nodeHeartbeat(true);
((AbstractYarnScheduler)rm.getResourceScheduler()).update();
rm.drainEventsImplicitly();
MockAM am = new MockAM(rm.getRMContext(), rm.masterService,
attempt.getAppAttemptId());

View File

@ -18,53 +18,74 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.Collection;
import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public abstract class ParameterizedSchedulerTestBase {
protected final static String TEST_DIR =
new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
private final static String FS_ALLOC_FILE =
new File(TEST_DIR, "test-fs-queues.xml").getAbsolutePath();
private SchedulerType schedulerType;
private YarnConfiguration conf = null;
private AbstractYarnScheduler scheduler = null;
public enum SchedulerType {
CAPACITY, FAIR
}
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> getParameters() {
return Arrays.stream(SchedulerType.values()).map(
type -> new Object[]{type}).collect(Collectors.toList());
}
private SchedulerType schedulerType;
private YarnConfiguration conf = null;
private AbstractYarnScheduler scheduler = null;
public YarnConfiguration getConf() {
return conf;
}
@Before
public void configureScheduler() throws IOException, ClassNotFoundException {
// Due to parameterization, this gets called before each test method
public ParameterizedSchedulerTestBase(SchedulerType type)
throws IOException {
conf = new YarnConfiguration();
Class schedulerClass =
conf.getClass(YarnConfiguration.RM_SCHEDULER,
Class.forName(YarnConfiguration.DEFAULT_RM_SCHEDULER));
QueueMetrics.clearQueueMetrics();
DefaultMetricsSystem.setMiniClusterMode(true);
if (schedulerClass == FairScheduler.class) {
schedulerType = SchedulerType.FAIR;
configureFairScheduler(conf);
scheduler = new FairScheduler();
} else if (schedulerClass == CapacityScheduler.class) {
schedulerType = SchedulerType.CAPACITY;
scheduler = new CapacityScheduler();
((CapacityScheduler)scheduler).setConf(conf);
schedulerType = type;
switch (schedulerType) {
case FAIR:
configureFairScheduler(conf);
scheduler = new FairScheduler();
conf.set(YarnConfiguration.RM_SCHEDULER,
FairScheduler.class.getName());
break;
case CAPACITY:
scheduler = new CapacityScheduler();
((CapacityScheduler)scheduler).setConf(conf);
conf.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getName());
break;
default:
throw new IllegalArgumentException("Invalid type: " + type);
}
}
@ -85,7 +106,6 @@ public abstract class ParameterizedSchedulerTestBase {
out.println("</allocations>");
out.close();
conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE);
conf.setLong(FairSchedulerConfiguration.UPDATE_INTERVAL_MS, 10);
}
@ -97,7 +117,8 @@ public abstract class ParameterizedSchedulerTestBase {
/**
* Return a scheduler configured by {@code YarnConfiguration.RM_SCHEDULER}
*
* <p>The scheduler is configured by {@link #configureScheduler()}.
* <p>The scheduler is configured by
* {@link #ParameterizedSchedulerTestBase(SchedulerType)}.
* Client test code can obtain the scheduler with this getter method.
* Schedulers supported by this class are {@link FairScheduler} or
* {@link CapacityScheduler}. </p>

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.junit.After;
@ -95,6 +96,7 @@ public abstract class RMHATestBase extends ClientBaseWithFixes{
throws Exception {
RMAppAttempt attempt = app.getCurrentAppAttempt();
nm.nodeHeartbeat(true);
((AbstractYarnScheduler)rm.getResourceScheduler()).update();
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
am.registerAppAttempt();
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@ -41,11 +42,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
@ -54,6 +58,12 @@ import org.junit.Test;
*/
public class TestNodeBlacklistingOnAMFailures {
@Before
public void setup() {
QueueMetrics.clearQueueMetrics();
DefaultMetricsSystem.setMiniClusterMode(true);
}
@Test(timeout = 100000)
public void testNodeBlacklistingOnAMFailure() throws Exception {
@ -361,6 +371,7 @@ public class TestNodeBlacklistingOnAMFailures {
// Now the AM container should be allocated
RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm);
node.nodeHeartbeat(true);
((AbstractYarnScheduler)rm.getResourceScheduler()).update();
rm.drainEvents();
MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000);
rm.sendAMLaunched(attempt.getAppAttemptId());
@ -388,6 +399,7 @@ public class TestNodeBlacklistingOnAMFailures {
.println("New AppAttempt launched " + attempt.getAppAttemptId());
node.nodeHeartbeat(true);
((AbstractYarnScheduler)rm.getResourceScheduler()).update();
rm.drainEvents();
MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000);

View File

@ -21,11 +21,13 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.base.Supplier;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.junit.Before;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
@ -89,6 +91,10 @@ public class TestRM extends ParameterizedSchedulerTestBase {
private YarnConfiguration conf;
public TestRM(SchedulerType type) throws IOException {
super(type);
}
@Before
public void setup() {
conf = getConf();

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.security.Groups;
@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_PROXY_USER_PREFIX;
@ -109,6 +111,9 @@ public class TestRMAdminService {
@Before
public void setup() throws IOException {
QueueMetrics.clearQueueMetrics();
DefaultMetricsSystem.setMiniClusterMode(true);
configuration = new YarnConfiguration();
configuration.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getCanonicalName());

View File

@ -147,6 +147,10 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
private static InetSocketAddress rmAddr;
private List<MockRM> rms = new ArrayList<MockRM>();
public TestRMRestart(SchedulerType type) throws IOException {
super(type);
}
@Before
public void setup() throws IOException {
conf = getConf();
@ -384,6 +388,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
// assert app1 attempt is saved
attempt1 = loadedApp1.getCurrentAppAttempt();
attemptId1 = attempt1.getAppAttemptId();
((AbstractYarnScheduler)rm2.getResourceScheduler()).update();
rm2.waitForState(attemptId1, RMAppAttemptState.ALLOCATED);
appState = rmAppState.get(loadedApp1.getApplicationId());
attemptState = appState.getAttempt(attemptId1);

View File

@ -107,6 +107,10 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
MockRM rm1 = null;
MockRM rm2 = null;
public TestWorkPreservingRMRestart(SchedulerType type) throws IOException {
super(type);
}
@Before
public void setup() throws UnknownHostException {
Logger rootLogger = LogManager.getRootLogger();

View File

@ -53,6 +53,10 @@ public class TestReservationSystem extends
private Configuration conf;
private RMContext mockRMContext;
public TestReservationSystem(SchedulerType type) throws IOException {
super(type);
}
@Before
public void setUp() throws IOException {
scheduler = initializeScheduler();

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@ -70,6 +71,10 @@ public class TestNMReconnect extends ParameterizedSchedulerTestBase {
private Dispatcher dispatcher;
private RMContextImpl context;
public TestNMReconnect(SchedulerType type) throws IOException {
super(type);
}
private class TestRMNodeEventDispatcher implements
EventHandler<RMNodeEvent> {

View File

@ -84,13 +84,16 @@ import org.mockito.Mockito;
@SuppressWarnings("unchecked")
public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
public TestAbstractYarnScheduler(SchedulerType type) throws IOException {
super(type);
}
@Test
public void testMaximimumAllocationMemory() throws Exception {
final int node1MaxMemory = 15 * 1024;
final int node2MaxMemory = 5 * 1024;
final int node3MaxMemory = 6 * 1024;
final int configuredMaxMemory = 10 * 1024;
configureScheduler();
YarnConfiguration conf = getConf();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
configuredMaxMemory);
@ -177,7 +180,6 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
final int node2MaxVCores = 5;
final int node3MaxVCores = 6;
final int configuredMaxVCores = 10;
configureScheduler();
YarnConfiguration conf = getConf();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
configuredMaxVCores);
@ -381,7 +383,6 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
@Test(timeout = 10000)
public void testReleasedContainerIfAppAttemptisNull() throws Exception {
YarnConfiguration conf=getConf();
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
MockRM rm1 = new MockRM(conf);
try {
rm1.start();
@ -425,7 +426,6 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
@Test(timeout=60000)
public void testContainerReleasedByNode() throws Exception {
System.out.println("Starting testContainerReleasedByNode");
configureScheduler();
YarnConfiguration conf = getConf();
MockRM rm1 = new MockRM(conf);
try {
@ -538,7 +538,6 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
@Test(timeout = 60000)
public void testResourceRequestRestoreWhenRMContainerIsAtAllocated()
throws Exception {
configureScheduler();
YarnConfiguration conf = getConf();
MockRM rm1 = new MockRM(conf);
try {
@ -627,7 +626,6 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
public void testResourceRequestRecoveryToTheRightAppAttempt()
throws Exception {
configureScheduler();
YarnConfiguration conf = getConf();
MockRM rm = new MockRM(conf);
try {
@ -798,7 +796,6 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
*/
@Test(timeout = 60000)
public void testNodemanagerReconnect() throws Exception {
configureScheduler();
Configuration conf = getConf();
MockRM rm = new MockRM(conf);
try {
@ -846,4 +843,35 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
rm.stop();
}
}
@Test(timeout = 10000)
public void testUpdateThreadLifeCycle() throws Exception {
MockRM rm = new MockRM(getConf());
try {
rm.start();
AbstractYarnScheduler scheduler =
(AbstractYarnScheduler) rm.getResourceScheduler();
if (getSchedulerType().equals(SchedulerType.FAIR)) {
Thread updateThread = scheduler.updateThread;
Assert.assertTrue(updateThread.isAlive());
scheduler.stop();
int numRetries = 100;
while (numRetries-- > 0 && updateThread.isAlive()) {
Thread.sleep(50);
}
Assert.assertNotEquals("The Update thread is still alive", 0, numRetries);
} else if (getSchedulerType().equals(SchedulerType.CAPACITY)) {
Assert.assertNull("updateThread shouldn't have been created",
scheduler.updateThread);
} else {
Assert.fail("Unhandled SchedulerType, " + getSchedulerType() +
", please update this unit test.");
}
} finally {
rm.stop();
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -30,6 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@ -46,9 +48,23 @@ public class TestSchedulingWithAllocationRequestId
LoggerFactory.getLogger(TestSchedulingWithAllocationRequestId.class);
private static final int GB = 1024;
@Test
public TestSchedulingWithAllocationRequestId(SchedulerType type) throws IOException {
super(type);
}
@Override
public YarnConfiguration getConf() {
YarnConfiguration conf = super.getConf();
if (getSchedulerType().equals(SchedulerType.FAIR)) {
// Some tests here rely on being able to assign multiple containers with
// a single heartbeat
conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
}
return conf;
}
@Test (timeout = 10000)
public void testMultipleAllocationRequestIds() throws Exception {
configureScheduler();
YarnConfiguration conf = getConf();
MockRM rm = new MockRM(conf);
try {
@ -63,32 +79,20 @@ public class TestSchedulingWithAllocationRequestId
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
// add request for containers with id 10 & 20
am1.addRequests(new String[] {"127.0.0.1" }, 2 * GB, 1, 1, 10L);
AllocateResponse allocResponse = am1.schedule(); // send the request
am1.addRequests(new String[] {"127.0.0.2" }, 2 * GB, 1, 2, 20L);
allocResponse = am1.schedule(); // send the request
// send requests for containers with id 10 & 20
am1.allocate(am1.createReq(
new String[] {"127.0.0.1"}, 2 * GB, 1, 1, 10L), null);
am1.allocate(am1.createReq(
new String[] {"127.0.0.2"}, 2 * GB, 1, 2, 20L), null);
// check if request id 10 is satisfied
nm1.nodeHeartbeat(true);
allocResponse = am1.schedule(); // send the request
while (allocResponse.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(100);
allocResponse = am1.schedule();
}
AllocateResponse allocResponse = waitForAllocResponse(rm, am1, nm1, 1);
List<Container> allocated = allocResponse.getAllocatedContainers();
Assert.assertEquals(1, allocated.size());
checkAllocatedContainer(allocated.get(0), 2 * GB, nm1.getNodeId(), 10);
// check now if request id 20 is satisfied
nm2.nodeHeartbeat(true);
while (allocResponse.getAllocatedContainers().size() < 2) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(100);
allocResponse = am1.schedule();
}
allocResponse = waitForAllocResponse(rm, am1, nm2, 2);
allocated = allocResponse.getAllocatedContainers();
Assert.assertEquals(2, allocated.size());
for (Container container : allocated) {
@ -101,9 +105,8 @@ public class TestSchedulingWithAllocationRequestId
}
}
@Test
@Test (timeout = 10000)
public void testMultipleAllocationRequestDiffPriority() throws Exception {
configureScheduler();
YarnConfiguration conf = getConf();
MockRM rm = new MockRM(conf);
try {
@ -118,20 +121,14 @@ public class TestSchedulingWithAllocationRequestId
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
// add request for containers with id 10 & 20
am1.addRequests(new String[] {"127.0.0.1" }, 2 * GB, 2, 1, 10L);
AllocateResponse allocResponse = am1.schedule(); // send the request
am1.addRequests(new String[] {"127.0.0.2" }, 2 * GB, 1, 2, 20L);
allocResponse = am1.schedule(); // send the request
// send requests for containers with id 10 & 20
am1.allocate(am1.createReq(
new String[] {"127.0.0.1"}, 2 * GB, 2, 1, 10L), null);
am1.allocate(am1.createReq(
new String[] {"127.0.0.2"}, 2 * GB, 1, 2, 20L), null);
// check if request id 20 is satisfied first
nm2.nodeHeartbeat(true);
while (allocResponse.getAllocatedContainers().size() < 2) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(100);
allocResponse = am1.schedule();
}
AllocateResponse allocResponse = waitForAllocResponse(rm, am1, nm2, 2);
List<Container> allocated = allocResponse.getAllocatedContainers();
Assert.assertEquals(2, allocated.size());
for (Container container : allocated) {
@ -139,13 +136,7 @@ public class TestSchedulingWithAllocationRequestId
}
// check now if request id 10 is satisfied
nm1.nodeHeartbeat(true);
allocResponse = am1.schedule(); // send the request
while (allocResponse.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(100);
allocResponse = am1.schedule();
}
allocResponse = waitForAllocResponse(rm, am1, nm1, 1);
allocated = allocResponse.getAllocatedContainers();
Assert.assertEquals(1, allocated.size());
checkAllocatedContainer(allocated.get(0), 2 * GB, nm1.getNodeId(), 10);
@ -164,9 +155,8 @@ public class TestSchedulingWithAllocationRequestId
allocated.getAllocationRequestId());
}
@Test
@Test (timeout = 10000)
public void testMultipleAppsWithAllocationReqId() throws Exception {
configureScheduler();
YarnConfiguration conf = getConf();
MockRM rm = new MockRM(conf);
try {
@ -190,19 +180,11 @@ public class TestSchedulingWithAllocationRequestId
// Submit app1 RR with allocationReqId = 5
int numContainers = 1;
am1.addRequests(new String[] {host0, host1 }, 1 * GB, 1, numContainers,
5L);
AllocateResponse allocResponse = am1.schedule();
// wait for containers to be allocated.
nm1.nodeHeartbeat(true);
allocResponse = am1.schedule(); // send the request
while (allocResponse.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(100);
allocResponse = am1.schedule();
}
am1.allocate(am1.createReq(
new String[] {host0, host1}, 1 * GB, 1, numContainers, 5L), null);
// wait for container to be allocated.
AllocateResponse allocResponse = waitForAllocResponse(rm, am1, nm1, 1);
List<Container> allocated = allocResponse.getAllocatedContainers();
Assert.assertEquals(1, allocated.size());
checkAllocatedContainer(allocated.get(0), 1 * GB, nm1.getNodeId(), 5L);
@ -212,55 +194,31 @@ public class TestSchedulingWithAllocationRequestId
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
// Submit app2 RR with allocationReqId = 5
am2.addRequests(new String[] {host0, host1 }, 2 * GB, 1, numContainers,
5L);
am2.schedule();
// wait for containers to be allocated.
nm2.nodeHeartbeat(true);
allocResponse = am2.schedule(); // send the request
while (allocResponse.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(100);
allocResponse = am2.schedule();
}
am2.allocate(am1.createReq(
new String[] {host0, host1}, 2 * GB, 1, numContainers, 5L), null);
// wait for container to be allocated.
allocResponse = waitForAllocResponse(rm, am2, nm2, 1);
allocated = allocResponse.getAllocatedContainers();
Assert.assertEquals(1, allocated.size());
checkAllocatedContainer(allocated.get(0), 2 * GB, nm2.getNodeId(), 5L);
// Now submit app2 RR with allocationReqId = 10
am2.addRequests(new String[] {host0, host1 }, 3 * GB, 1, numContainers,
10L);
am2.schedule();
// wait for containers to be allocated.
nm1.nodeHeartbeat(true);
allocResponse = am2.schedule(); // send the request
while (allocResponse.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(100);
allocResponse = am2.schedule();
}
am2.allocate(am1.createReq(
new String[] {host0, host1}, 3 * GB, 1, numContainers, 10L), null);
// wait for container to be allocated.
allocResponse = waitForAllocResponse(rm, am2, nm1, 1);
allocated = allocResponse.getAllocatedContainers();
Assert.assertEquals(1, allocated.size());
checkAllocatedContainer(allocated.get(0), 3 * GB, nm1.getNodeId(), 10L);
// Now submit app1 RR with allocationReqId = 10
am1.addRequests(new String[] {host0, host1 }, 4 * GB, 1, numContainers,
10L);
am1.schedule();
// wait for containers to be allocated.
nm2.nodeHeartbeat(true);
allocResponse = am1.schedule(); // send the request
while (allocResponse.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(100);
allocResponse = am1.schedule();
}
am1.allocate(am1.createReq(
new String[] {host0, host1}, 4 * GB, 1, numContainers, 10L), null);
// wait for container to be allocated.
allocResponse = waitForAllocResponse(rm, am1, nm2, 1);
allocated = allocResponse.getAllocatedContainers();
Assert.assertEquals(1, allocated.size());
checkAllocatedContainer(allocated.get(0), 4 * GB, nm2.getNodeId(), 10L);
@ -271,4 +229,17 @@ public class TestSchedulingWithAllocationRequestId
}
}
private AllocateResponse waitForAllocResponse(MockRM rm, MockAM am, MockNM nm,
int size) throws Exception {
AllocateResponse allocResponse = am.doHeartbeat();
while (allocResponse.getAllocatedContainers().size() < size) {
LOG.info("Waiting for containers to be created for app...");
nm.nodeHeartbeat(true);
((AbstractYarnScheduler) rm.getResourceScheduler()).update();
Thread.sleep(100);
allocResponse = am.doHeartbeat();
}
return allocResponse;
}
}

View File

@ -285,24 +285,19 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
}
@Test
public void testThreadLifeCycle() throws InterruptedException {
public void testSchedulerThreadLifeCycle() throws InterruptedException {
scheduler.start();
Thread updateThread = scheduler.updateThread;
Thread schedulingThread = scheduler.schedulingThread;
assertTrue(updateThread.isAlive());
assertTrue(schedulingThread.isAlive());
scheduler.stop();
int numRetries = 100;
while (numRetries-- > 0 &&
(updateThread.isAlive() || schedulingThread.isAlive())) {
while (numRetries-- > 0 && schedulingThread.isAlive()) {
Thread.sleep(50);
}
assertNotEquals("One of the threads is still alive", 0, numRetries);
assertNotEquals("The Scheduling thread is still alive", 0, numRetries);
}
@Test

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
@ -148,7 +149,10 @@ public class TestFairOrderingPolicy {
// Define top-level queues
String queuePath = CapacitySchedulerConfiguration.ROOT + ".default";
csConf.setOrderingPolicy(queuePath, CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY);
csConf.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getCanonicalName());
csConf.setOrderingPolicy(queuePath,
CapacitySchedulerConfiguration.FAIR_APP_ORDERING_POLICY);
csConf.setOrderingPolicyParameter(queuePath,
FairOrderingPolicy.ENABLE_SIZE_BASED_WEIGHT, "true");
csConf.setMaximumApplicationMasterResourcePerQueuePercent(queuePath, 0.1f);

View File

@ -86,6 +86,10 @@ import static org.mockito.Mockito.when;
public class TestClientToAMTokens extends ParameterizedSchedulerTestBase {
private YarnConfiguration conf;
public TestClientToAMTokens(SchedulerType type) throws IOException {
super(type);
}
@Before
public void setup() {
conf = getConf();