YARN-7252. Removing queue then failing over results in exception

This commit is contained in:
Jonathan Hung 2017-09-26 11:41:05 -07:00
parent 09c5dfe937
commit 1d36b53ab6
3 changed files with 117 additions and 2 deletions

View File

@ -88,4 +88,10 @@ public interface CapacitySchedulerContext {
* @return Max Cluster level App priority.
*/
Priority getMaxClusterLevelAppPriority();
/**
* Returns if configuration is mutable.
* @return if configuration is mutable
*/
boolean isConfigurationMutable();
}

View File

@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueState;
@ -170,8 +171,14 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
CSQueue newRoot = parseQueue(this.csContext, newConf, null,
CapacitySchedulerConfiguration.ROOT, newQueues, queues, NOOP);
// When failing over, if using configuration store, don't validate queue
// hierarchy since queues can be removed without being STOPPED.
if (!csContext.isConfigurationMutable() ||
csContext.getRMContext().getHAServiceState()
!= HAServiceProtocol.HAServiceState.STANDBY) {
// Ensure queue hiearchy in the new XML file is proper.
validateQueueHierarchy(queues, newQueues);
}
// Add new queues and delete OldQeueus only after validation.
updateQueues(queues, newQueues);

View File

@ -38,18 +38,21 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfSchedu
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Tests {@link ZKConfigurationStore}.
@ -303,6 +306,105 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
rm2.close();
}
/**
* When failing over, if RM1 stopped and removed a queue that RM2 has in
* memory, failing over to RM2 should not throw an exception.
* @throws Exception
*/
@Test
public void testFailoverAfterRemoveQueue() throws Exception {
HAServiceProtocol.StateChangeRequestInfo req =
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
Configuration conf1 = createRMHAConf("rm1,rm2", "rm1", 1234);
ResourceManager rm1 = new MockRM(conf1);
rm1.start();
rm1.getRMContext().getRMAdminService().transitionToActive(req);
assertEquals("RM with ZKStore didn't start",
Service.STATE.STARTED, rm1.getServiceState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
Configuration conf2 = createRMHAConf("rm1,rm2", "rm2", 5678);
ResourceManager rm2 = new MockRM(conf2);
rm2.start();
assertEquals("RM should be Standby",
HAServiceProtocol.HAServiceState.STANDBY,
rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
UserGroupInformation user = UserGroupInformation
.createUserForTesting(TEST_USER, new String[0]);
MutableConfigurationProvider confProvider = ((MutableConfScheduler)
rm1.getResourceScheduler()).getMutableConfProvider();
// Add root.a
SchedConfUpdateInfo schedConfUpdateInfo = new SchedConfUpdateInfo();
Map<String, String> addParams = new HashMap<>();
addParams.put("capacity", "100");
QueueConfigInfo addInfo = new QueueConfigInfo("root.a", addParams);
schedConfUpdateInfo.getAddQueueInfo().add(addInfo);
// Stop root.default
Map<String, String> stopParams = new HashMap<>();
stopParams.put("state", "STOPPED");
stopParams.put("capacity", "0");
QueueConfigInfo stopInfo = new QueueConfigInfo("root.default", stopParams);
schedConfUpdateInfo.getUpdateQueueInfo().add(stopInfo);
confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
confProvider.confirmPendingMutation(true);
assertTrue(Arrays.asList(((MutableConfScheduler) rm1.getResourceScheduler())
.getConfiguration().get("yarn.scheduler.capacity.root.queues").split
(",")).contains("a"));
// Remove root.default
schedConfUpdateInfo.getUpdateQueueInfo().clear();
schedConfUpdateInfo.getAddQueueInfo().clear();
schedConfUpdateInfo.getRemoveQueueInfo().add("root.default");
confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
rm1.getResourceScheduler().reinitialize(conf1, rm1.getRMContext());
confProvider.confirmPendingMutation(true);
assertEquals("a", ((MutableConfScheduler) rm1.getResourceScheduler())
.getConfiguration().get("yarn.scheduler.capacity.root.queues"));
// Start RM2 and verifies it starts with updated configuration
rm2.getRMContext().getRMAdminService().transitionToActive(req);
assertEquals("RM with ZKStore didn't start",
Service.STATE.STARTED, rm2.getServiceState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
for (int i = 0; i < ZK_TIMEOUT_MS / 50; i++) {
if (HAServiceProtocol.HAServiceState.ACTIVE ==
rm1.getRMContext().getRMAdminService().getServiceStatus()
.getState()) {
Thread.sleep(100);
}
}
assertEquals("RM should have been fenced",
HAServiceProtocol.HAServiceState.STANDBY,
rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
assertEquals("a", ((MutableCSConfigurationProvider) (
(CapacityScheduler) rm2.getResourceScheduler())
.getMutableConfProvider()).getConfStore().retrieve()
.get("yarn.scheduler.capacity.root.queues"));
assertEquals("a", ((MutableConfScheduler) rm2.getResourceScheduler())
.getConfiguration().get("yarn.scheduler.capacity.root.queues"));
// Transition to standby will set RM's HA status and then reinitialize in
// a separate thread. Despite asserting for STANDBY state, it's
// possible for reinitialization to be unfinished. Wait here for it to
// finish, otherwise closing rm1 will close zkManager and the unfinished
// reinitialization will throw an exception.
Thread.sleep(10000);
rm1.close();
rm2.close();
}
@Override
public YarnConfigurationStore createConfStore() {
return new ZKConfigurationStore();