YARN-6322: Disable queue refresh when configuration mutation is enabled. Contributed by Jonathan Hung
This commit is contained in:
parent
74ba6ffa0b
commit
4c8b208adb
|
@ -29,6 +29,7 @@ import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
|
@ -92,6 +93,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSyst
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -384,6 +387,12 @@ public class AdminService extends CompositeService implements
|
||||||
RefreshQueuesResponse response =
|
RefreshQueuesResponse response =
|
||||||
recordFactory.newRecordInstance(RefreshQueuesResponse.class);
|
recordFactory.newRecordInstance(RefreshQueuesResponse.class);
|
||||||
try {
|
try {
|
||||||
|
ResourceScheduler scheduler = rm.getRMContext().getScheduler();
|
||||||
|
if (scheduler instanceof MutableConfScheduler
|
||||||
|
&& ((MutableConfScheduler) scheduler).isConfigurationMutable()) {
|
||||||
|
throw new IOException("Scheduler configuration is mutable. " +
|
||||||
|
operation + " is not allowed in this scenario.");
|
||||||
|
}
|
||||||
refreshQueues();
|
refreshQueues();
|
||||||
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
|
RMAuditLogger.logSuccess(user.getShortUserName(), operation,
|
||||||
"AdminService");
|
"AdminService");
|
||||||
|
@ -393,7 +402,8 @@ public class AdminService extends CompositeService implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void refreshQueues() throws IOException, YarnException {
|
@Private
|
||||||
|
public void refreshQueues() throws IOException, YarnException {
|
||||||
rm.getRMContext().getScheduler().reinitialize(getConfig(),
|
rm.getRMContext().getScheduler().reinitialize(getConfig(),
|
||||||
this.rm.getRMContext());
|
this.rm.getRMContext());
|
||||||
// refresh the reservation system
|
// refresh the reservation system
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -33,10 +34,11 @@ public interface MutableConfScheduler extends ResourceScheduler {
|
||||||
* Update the scheduler's configuration.
|
* Update the scheduler's configuration.
|
||||||
* @param user Caller of this update
|
* @param user Caller of this update
|
||||||
* @param confUpdate configuration update
|
* @param confUpdate configuration update
|
||||||
* @throws IOException if update is invalid
|
* @throws IOException if scheduler could not be reinitialized
|
||||||
|
* @throws YarnException if reservation system could not be reinitialized
|
||||||
*/
|
*/
|
||||||
void updateConfiguration(UserGroupInformation user,
|
void updateConfiguration(UserGroupInformation user,
|
||||||
SchedConfUpdateInfo confUpdate) throws IOException;
|
SchedConfUpdateInfo confUpdate) throws IOException, YarnException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the scheduler configuration.
|
* Get the scheduler configuration.
|
||||||
|
@ -50,4 +52,10 @@ public interface MutableConfScheduler extends ResourceScheduler {
|
||||||
* @return the queue object
|
* @return the queue object
|
||||||
*/
|
*/
|
||||||
Queue getQueue(String queueName);
|
Queue getQueue(String queueName);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return whether the scheduler configuration is mutable.
|
||||||
|
* @return whether scheduler configuration is mutable or not.
|
||||||
|
*/
|
||||||
|
boolean isConfigurationMutable();
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -39,8 +40,9 @@ public interface MutableConfigurationProvider {
|
||||||
* @param user User issuing the request
|
* @param user User issuing the request
|
||||||
* @param confUpdate Key-value pairs for configurations to be updated.
|
* @param confUpdate Key-value pairs for configurations to be updated.
|
||||||
* @throws IOException if scheduler could not be reinitialized
|
* @throws IOException if scheduler could not be reinitialized
|
||||||
|
* @throws YarnException if reservation system could not be reinitialized
|
||||||
*/
|
*/
|
||||||
void mutateConfiguration(UserGroupInformation user, SchedConfUpdateInfo
|
void mutateConfiguration(UserGroupInformation user, SchedConfUpdateInfo
|
||||||
confUpdate) throws IOException;
|
confUpdate) throws IOException, YarnException;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2620,8 +2620,8 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateConfiguration(UserGroupInformation user,
|
public void updateConfiguration(UserGroupInformation user,
|
||||||
SchedConfUpdateInfo confUpdate) throws IOException {
|
SchedConfUpdateInfo confUpdate) throws IOException, YarnException {
|
||||||
if (csConfProvider instanceof MutableConfigurationProvider) {
|
if (isConfigurationMutable()) {
|
||||||
((MutableConfigurationProvider) csConfProvider).mutateConfiguration(
|
((MutableConfigurationProvider) csConfProvider).mutateConfiguration(
|
||||||
user, confUpdate);
|
user, confUpdate);
|
||||||
} else {
|
} else {
|
||||||
|
@ -2629,4 +2629,9 @@ public class CapacityScheduler extends
|
||||||
"provider does not support updating configuration.");
|
"provider does not support updating configuration.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isConfigurationMutable() {
|
||||||
|
return csConfProvider instanceof MutableConfigurationProvider;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicyFactory;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicyFactory;
|
||||||
|
@ -58,7 +59,6 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
||||||
private YarnConfigurationStore confStore;
|
private YarnConfigurationStore confStore;
|
||||||
private ConfigurationMutationACLPolicy aclMutationPolicy;
|
private ConfigurationMutationACLPolicy aclMutationPolicy;
|
||||||
private RMContext rmContext;
|
private RMContext rmContext;
|
||||||
private Configuration conf;
|
|
||||||
|
|
||||||
public MutableCSConfigurationProvider(RMContext rmContext) {
|
public MutableCSConfigurationProvider(RMContext rmContext) {
|
||||||
this.rmContext = rmContext;
|
this.rmContext = rmContext;
|
||||||
|
@ -96,7 +96,6 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
||||||
this.aclMutationPolicy = ConfigurationMutationACLPolicyFactory
|
this.aclMutationPolicy = ConfigurationMutationACLPolicyFactory
|
||||||
.getPolicy(config);
|
.getPolicy(config);
|
||||||
aclMutationPolicy.init(config, rmContext);
|
aclMutationPolicy.init(config, rmContext);
|
||||||
this.conf = config;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -109,7 +108,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void mutateConfiguration(UserGroupInformation user,
|
public synchronized void mutateConfiguration(UserGroupInformation user,
|
||||||
SchedConfUpdateInfo confUpdate) throws IOException {
|
SchedConfUpdateInfo confUpdate) throws IOException, YarnException {
|
||||||
if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) {
|
if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) {
|
||||||
throw new AccessControlException("User is not admin of all modified" +
|
throw new AccessControlException("User is not admin of all modified" +
|
||||||
" queues.");
|
" queues.");
|
||||||
|
@ -126,8 +125,8 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
rmContext.getScheduler().reinitialize(conf, rmContext);
|
rmContext.getRMAdminService().refreshQueues();
|
||||||
} catch (IOException e) {
|
} catch (IOException | YarnException e) {
|
||||||
schedConf = oldConf;
|
schedConf = oldConf;
|
||||||
confStore.confirmMutation(id, false);
|
confStore.confirmMutation(id, false);
|
||||||
throw e;
|
throw e;
|
||||||
|
@ -148,7 +147,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
rmContext.getScheduler().reinitialize(conf, rmContext);
|
rmContext.getScheduler().reinitialize(schedConf, rmContext);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
schedConf = oldConf;
|
schedConf = oldConf;
|
||||||
confStore.confirmMutation(mutation.getId(), false);
|
confStore.confirmMutation(mutation.getId(), false);
|
||||||
|
|
|
@ -197,6 +197,29 @@ public class TestRMAdminService {
|
||||||
Assert.assertTrue(maxAppsAfter != maxAppsBefore);
|
Assert.assertTrue(maxAppsAfter != maxAppsBefore);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAdminRefreshQueuesWithMutableSchedulerConfiguration() {
|
||||||
|
configuration.set(CapacitySchedulerConfiguration.CS_CONF_PROVIDER,
|
||||||
|
CapacitySchedulerConfiguration.STORE_CS_CONF_PROVIDER);
|
||||||
|
|
||||||
|
try {
|
||||||
|
rm = new MockRM(configuration);
|
||||||
|
rm.init(configuration);
|
||||||
|
rm.start();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
fail("Should not get any exceptions");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
rm.adminService.refreshQueues(RefreshQueuesRequest.newInstance());
|
||||||
|
fail("Expected exception while calling refreshQueues when scheduler" +
|
||||||
|
" configuration is mutable.");
|
||||||
|
} catch (Exception ex) {
|
||||||
|
assertTrue(ex.getMessage().endsWith("Scheduler configuration is " +
|
||||||
|
"mutable. refreshQueues is not allowed in this scenario."));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAdminRefreshNodesWithoutConfiguration()
|
public void testAdminRefreshNodesWithoutConfiguration()
|
||||||
throws IOException, YarnException {
|
throws IOException, YarnException {
|
||||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
@ -34,7 +36,6 @@ import java.util.Map;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.mockito.Matchers.any;
|
|
||||||
import static org.mockito.Mockito.doNothing;
|
import static org.mockito.Mockito.doNothing;
|
||||||
import static org.mockito.Mockito.doThrow;
|
import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -50,6 +51,7 @@ public class TestMutableCSConfigurationProvider {
|
||||||
private SchedConfUpdateInfo goodUpdate;
|
private SchedConfUpdateInfo goodUpdate;
|
||||||
private SchedConfUpdateInfo badUpdate;
|
private SchedConfUpdateInfo badUpdate;
|
||||||
private CapacityScheduler cs;
|
private CapacityScheduler cs;
|
||||||
|
private AdminService adminService;
|
||||||
|
|
||||||
private static final UserGroupInformation TEST_USER = UserGroupInformation
|
private static final UserGroupInformation TEST_USER = UserGroupInformation
|
||||||
.createUserForTesting("testUser", new String[] {});
|
.createUserForTesting("testUser", new String[] {});
|
||||||
|
@ -61,6 +63,8 @@ public class TestMutableCSConfigurationProvider {
|
||||||
when(rmContext.getScheduler()).thenReturn(cs);
|
when(rmContext.getScheduler()).thenReturn(cs);
|
||||||
when(cs.getConfiguration()).thenReturn(
|
when(cs.getConfiguration()).thenReturn(
|
||||||
new CapacitySchedulerConfiguration());
|
new CapacitySchedulerConfiguration());
|
||||||
|
adminService = mock(AdminService.class);
|
||||||
|
when(rmContext.getRMAdminService()).thenReturn(adminService);
|
||||||
confProvider = new MutableCSConfigurationProvider(rmContext);
|
confProvider = new MutableCSConfigurationProvider(rmContext);
|
||||||
goodUpdate = new SchedConfUpdateInfo();
|
goodUpdate = new SchedConfUpdateInfo();
|
||||||
Map<String, String> goodUpdateMap = new HashMap<>();
|
Map<String, String> goodUpdateMap = new HashMap<>();
|
||||||
|
@ -78,22 +82,20 @@ public class TestMutableCSConfigurationProvider {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInMemoryBackedProvider() throws IOException {
|
public void testInMemoryBackedProvider() throws IOException, YarnException {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
confProvider.init(conf);
|
confProvider.init(conf);
|
||||||
assertNull(confProvider.loadConfiguration(conf)
|
assertNull(confProvider.loadConfiguration(conf)
|
||||||
.get("yarn.scheduler.capacity.root.a.goodKey"));
|
.get("yarn.scheduler.capacity.root.a.goodKey"));
|
||||||
|
|
||||||
doNothing().when(cs).reinitialize(any(Configuration.class),
|
doNothing().when(adminService).refreshQueues();
|
||||||
any(RMContext.class));
|
|
||||||
confProvider.mutateConfiguration(TEST_USER, goodUpdate);
|
confProvider.mutateConfiguration(TEST_USER, goodUpdate);
|
||||||
assertEquals("goodVal", confProvider.loadConfiguration(conf)
|
assertEquals("goodVal", confProvider.loadConfiguration(conf)
|
||||||
.get("yarn.scheduler.capacity.root.a.goodKey"));
|
.get("yarn.scheduler.capacity.root.a.goodKey"));
|
||||||
|
|
||||||
assertNull(confProvider.loadConfiguration(conf).get(
|
assertNull(confProvider.loadConfiguration(conf).get(
|
||||||
"yarn.scheduler.capacity.root.a.badKey"));
|
"yarn.scheduler.capacity.root.a.badKey"));
|
||||||
doThrow(new IOException()).when(cs).reinitialize(any(Configuration.class),
|
doThrow(new IOException()).when(adminService).refreshQueues();
|
||||||
any(RMContext.class));
|
|
||||||
try {
|
try {
|
||||||
confProvider.mutateConfiguration(TEST_USER, badUpdate);
|
confProvider.mutateConfiguration(TEST_USER, badUpdate);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
|
Loading…
Reference in New Issue