YARN-7003. DRAINING state of queues is not recovered after RM restart. Contributed by Tao Yang.

This commit is contained in:
Weiwei Yang 2018-05-11 10:47:04 +08:00
parent d76fbbc9b8
commit 9db9cd95bd
3 changed files with 82 additions and 0 deletions

View File

@ -1244,4 +1244,19 @@ public abstract class AbstractCSQueue implements CSQueue {
public Map<String, Float> getUserWeights() {
return userWeights;
}
public void recoverDrainingState() {
try {
this.writeLock.lock();
if (getState() == QueueState.STOPPED) {
updateQueueState(QueueState.DRAINING);
}
LOG.info("Recover draining state for queue " + this.getQueuePath());
if (getParent() != null && getParent().getState() == QueueState.STOPPED) {
((AbstractCSQueue) getParent()).recoverDrainingState();
}
} finally {
this.writeLock.unlock();
}
}
}

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -808,6 +809,12 @@ public class CapacityScheduler extends
throw new QueueInvalidException(queueErrorMsg);
}
}
// When recovering apps in this queue but queue is in STOPPED state,
// that means its previous state was DRAINING. So we auto transit
// the state to DRAINING for recovery.
if (queue.getState() == QueueState.STOPPED) {
((LeafQueue) queue).recoverDrainingState();
}
// Submit to the queue
try {
queue.submitApplication(applicationId, user, queueName);

View File

@ -32,7 +32,12 @@ import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
@ -197,4 +202,59 @@ public class TestQueueState {
.thenCallRealMethod();
return application;
}
@Test (timeout = 30000)
public void testRecoverDrainingStateAfterRMRestart() throws Exception {
// init conf
CapacitySchedulerConfiguration newConf =
new CapacitySchedulerConfiguration();
newConf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
newConf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
false);
newConf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
newConf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1);
newConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{Q1});
newConf.setQueues(Q1_PATH, new String[]{Q2});
newConf.setCapacity(Q1_PATH, 100);
newConf.setCapacity(Q2_PATH, 100);
// init state store
MemoryRMStateStore newMemStore = new MemoryRMStateStore();
newMemStore.init(newConf);
// init RM & NMs & Nodes
MockRM rm = new MockRM(newConf, newMemStore);
rm.start();
MockNM nm = rm.registerNode("h1:1234", 204800);
// submit an app, AM is running on nm1
RMApp app = rm.submitApp(1024, "appname", "appuser", null, Q2);
MockRM.launchAM(app, rm, nm);
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
// update queue state to STOPPED
newConf.setState(Q1_PATH, QueueState.STOPPED);
CapacityScheduler capacityScheduler =
(CapacityScheduler) rm.getRMContext().getScheduler();
capacityScheduler.reinitialize(newConf, rm.getRMContext());
// current queue state should be DRAINING
Assert.assertEquals(QueueState.DRAINING,
capacityScheduler.getQueue(Q2).getState());
Assert.assertEquals(QueueState.DRAINING,
capacityScheduler.getQueue(Q1).getState());
// RM restart
rm = new MockRM(newConf, newMemStore);
rm.start();
rm.registerNode("h1:1234", 204800);
// queue state should be DRAINING after app recovered
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
capacityScheduler = (CapacityScheduler) rm.getRMContext().getScheduler();
Assert.assertEquals(QueueState.DRAINING,
capacityScheduler.getQueue(Q2).getState());
Assert.assertEquals(QueueState.DRAINING,
capacityScheduler.getQueue(Q1).getState());
// close rm
rm.close();
}
}