YARN-5948. Implement MutableConfigurationManager for handling storage into configuration store
This commit is contained in:
parent
38cf9ccd5b
commit
072527b31a
@ -609,6 +609,12 @@ public static boolean isAclEnabled(Configuration conf) {
|
|||||||
public static final String DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS =
|
public static final String DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS =
|
||||||
"org.apache.hadoop.yarn.LocalConfigurationProvider";
|
"org.apache.hadoop.yarn.LocalConfigurationProvider";
|
||||||
|
|
||||||
|
public static final String SCHEDULER_CONFIGURATION_STORE_CLASS =
|
||||||
|
YARN_PREFIX + "scheduler.configuration.store.class";
|
||||||
|
public static final String MEMORY_CONFIGURATION_STORE = "memory";
|
||||||
|
public static final String DEFAULT_CONFIGURATION_STORE =
|
||||||
|
MEMORY_CONFIGURATION_STORE;
|
||||||
|
|
||||||
public static final String YARN_AUTHORIZATION_PROVIDER = YARN_PREFIX
|
public static final String YARN_AUTHORIZATION_PROVIDER = YARN_PREFIX
|
||||||
+ "authorization-provider";
|
+ "authorization-provider";
|
||||||
private static final List<String> RM_SERVICES_ADDRESS_CONF_KEYS_HTTP =
|
private static final List<String> RM_SERVICES_ADDRESS_CONF_KEYS_HTTP =
|
||||||
|
@ -3192,4 +3192,16 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>
|
||||||
|
The type of configuration store to use for storing scheduler
|
||||||
|
configurations, if using a mutable configuration provider.
|
||||||
|
Keywords such as "memory" map to certain configuration store
|
||||||
|
implementations. If keyword is not found, try to load this
|
||||||
|
value as a class.
|
||||||
|
</description>
|
||||||
|
<name>yarn.scheduler.configuration.store.class</name>
|
||||||
|
<value>memory</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
</configuration>
|
</configuration>
|
||||||
|
@ -0,0 +1,35 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interface for allowing changing scheduler configurations.
|
||||||
|
*/
|
||||||
|
public interface MutableConfigurationProvider {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the scheduler configuration with the provided key value pairs.
|
||||||
|
* @param user User issuing the request
|
||||||
|
* @param confUpdate Key-value pairs for configurations to be updated.
|
||||||
|
*/
|
||||||
|
void mutateConfiguration(String user, Map<String, String> confUpdate);
|
||||||
|
|
||||||
|
}
|
@ -109,6 +109,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.CSConfigurationProvider;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.CSConfigurationProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.FileBasedCSConfigurationProvider;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.FileBasedCSConfigurationProvider;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.MutableCSConfigurationProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
|
||||||
@ -296,10 +297,15 @@ void initScheduler(Configuration configuration) throws
|
|||||||
String confProviderStr = configuration.get(
|
String confProviderStr = configuration.get(
|
||||||
CapacitySchedulerConfiguration.CS_CONF_PROVIDER,
|
CapacitySchedulerConfiguration.CS_CONF_PROVIDER,
|
||||||
CapacitySchedulerConfiguration.DEFAULT_CS_CONF_PROVIDER);
|
CapacitySchedulerConfiguration.DEFAULT_CS_CONF_PROVIDER);
|
||||||
if (confProviderStr.equals(
|
switch (confProviderStr) {
|
||||||
CapacitySchedulerConfiguration.FILE_CS_CONF_PROVIDER)) {
|
case CapacitySchedulerConfiguration.FILE_CS_CONF_PROVIDER:
|
||||||
this.csConfProvider = new FileBasedCSConfigurationProvider(rmContext);
|
this.csConfProvider =
|
||||||
} else {
|
new FileBasedCSConfigurationProvider(rmContext);
|
||||||
|
break;
|
||||||
|
case CapacitySchedulerConfiguration.STORE_CS_CONF_PROVIDER:
|
||||||
|
this.csConfProvider = new MutableCSConfigurationProvider(rmContext);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
throw new IOException("Invalid CS configuration provider: " +
|
throw new IOException("Invalid CS configuration provider: " +
|
||||||
confProviderStr);
|
confProviderStr);
|
||||||
}
|
}
|
||||||
|
@ -322,6 +322,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||||||
@Private
|
@Private
|
||||||
public static final String FILE_CS_CONF_PROVIDER = "file";
|
public static final String FILE_CS_CONF_PROVIDER = "file";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String STORE_CS_CONF_PROVIDER = "store";
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public static final String DEFAULT_CS_CONF_PROVIDER = FILE_CS_CONF_PROVIDER;
|
public static final String DEFAULT_CS_CONF_PROVIDER = FILE_CS_CONF_PROVIDER;
|
||||||
|
|
||||||
|
@ -32,8 +32,9 @@ public interface CSConfigurationProvider {
|
|||||||
/**
|
/**
|
||||||
* Initialize the configuration provider with given conf.
|
* Initialize the configuration provider with given conf.
|
||||||
* @param conf configuration to initialize with
|
* @param conf configuration to initialize with
|
||||||
|
* @throws IOException if initialization fails due to misconfiguration
|
||||||
*/
|
*/
|
||||||
void init(Configuration conf);
|
void init(Configuration conf) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Loads capacity scheduler configuration object.
|
* Loads capacity scheduler configuration object.
|
||||||
|
@ -0,0 +1,94 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* CS configuration provider which implements
|
||||||
|
* {@link MutableConfigurationProvider} for modifying capacity scheduler
|
||||||
|
* configuration.
|
||||||
|
*/
|
||||||
|
public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
||||||
|
MutableConfigurationProvider {
|
||||||
|
|
||||||
|
private Configuration schedConf;
|
||||||
|
private YarnConfigurationStore confStore;
|
||||||
|
private RMContext rmContext;
|
||||||
|
private Configuration conf;
|
||||||
|
|
||||||
|
public MutableCSConfigurationProvider(RMContext rmContext) {
|
||||||
|
this.rmContext = rmContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(Configuration config) throws IOException {
|
||||||
|
String store = config.get(
|
||||||
|
YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
|
||||||
|
YarnConfiguration.DEFAULT_CONFIGURATION_STORE);
|
||||||
|
switch (store) {
|
||||||
|
case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
|
||||||
|
this.confStore = new InMemoryConfigurationStore();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
this.confStore = YarnConfigurationStoreFactory.getStore(config);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Configuration initialSchedConf = new Configuration(false);
|
||||||
|
initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE);
|
||||||
|
this.schedConf = initialSchedConf;
|
||||||
|
confStore.initialize(config, initialSchedConf);
|
||||||
|
this.conf = config;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CapacitySchedulerConfiguration loadConfiguration(Configuration
|
||||||
|
configuration) throws IOException {
|
||||||
|
Configuration loadedConf = new Configuration(configuration);
|
||||||
|
loadedConf.addResource(schedConf);
|
||||||
|
return new CapacitySchedulerConfiguration(loadedConf, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void mutateConfiguration(String user,
|
||||||
|
Map<String, String> confUpdate) {
|
||||||
|
Configuration oldConf = new Configuration(schedConf);
|
||||||
|
LogMutation log = new LogMutation(confUpdate, user);
|
||||||
|
long id = confStore.logMutation(log);
|
||||||
|
for (Map.Entry<String, String> kv : confUpdate.entrySet()) {
|
||||||
|
schedConf.set(kv.getKey(), kv.getValue());
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
rmContext.getScheduler().reinitialize(conf, rmContext);
|
||||||
|
} catch (IOException e) {
|
||||||
|
schedConf = oldConf;
|
||||||
|
confStore.confirmMutation(id, false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
confStore.confirmMutation(id, true);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,46 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Factory class for creating instances of {@link YarnConfigurationStore}.
|
||||||
|
*/
|
||||||
|
public final class YarnConfigurationStoreFactory {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(
|
||||||
|
YarnConfigurationStoreFactory.class);
|
||||||
|
|
||||||
|
private YarnConfigurationStoreFactory() {
|
||||||
|
// Unused.
|
||||||
|
}
|
||||||
|
|
||||||
|
public static YarnConfigurationStore getStore(Configuration conf) {
|
||||||
|
Class<? extends YarnConfigurationStore> storeClass =
|
||||||
|
conf.getClass(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
|
||||||
|
InMemoryConfigurationStore.class, YarnConfigurationStore.class);
|
||||||
|
LOG.info("Using YarnConfigurationStore implementation - " + storeClass);
|
||||||
|
return ReflectionUtils.newInstance(storeClass, conf);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,83 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.doNothing;
|
||||||
|
import static org.mockito.Mockito.doThrow;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests {@link MutableCSConfigurationProvider}.
|
||||||
|
*/
|
||||||
|
public class TestMutableCSConfigurationProvider {
|
||||||
|
|
||||||
|
private MutableCSConfigurationProvider confProvider;
|
||||||
|
private RMContext rmContext;
|
||||||
|
private Map<String, String> goodUpdate;
|
||||||
|
private Map<String, String> badUpdate;
|
||||||
|
private CapacityScheduler cs;
|
||||||
|
|
||||||
|
private static final String TEST_USER = "testUser";
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
cs = mock(CapacityScheduler.class);
|
||||||
|
rmContext = mock(RMContext.class);
|
||||||
|
when(rmContext.getScheduler()).thenReturn(cs);
|
||||||
|
confProvider = new MutableCSConfigurationProvider(rmContext);
|
||||||
|
goodUpdate = new HashMap<>();
|
||||||
|
goodUpdate.put("goodKey", "goodVal");
|
||||||
|
badUpdate = new HashMap<>();
|
||||||
|
badUpdate.put("badKey", "badVal");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testInMemoryBackedProvider() throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
confProvider.init(conf);
|
||||||
|
assertNull(confProvider.loadConfiguration(conf)
|
||||||
|
.get("goodKey"));
|
||||||
|
|
||||||
|
doNothing().when(cs).reinitialize(any(Configuration.class),
|
||||||
|
any(RMContext.class));
|
||||||
|
confProvider.mutateConfiguration(TEST_USER, goodUpdate);
|
||||||
|
assertEquals("goodVal", confProvider.loadConfiguration(conf)
|
||||||
|
.get("goodKey"));
|
||||||
|
|
||||||
|
assertNull(confProvider.loadConfiguration(conf).get("badKey"));
|
||||||
|
doThrow(new IOException()).when(cs).reinitialize(any(Configuration.class),
|
||||||
|
any(RMContext.class));
|
||||||
|
confProvider.mutateConfiguration(TEST_USER, badUpdate);
|
||||||
|
assertNull(confProvider.loadConfiguration(conf).get("badKey"));
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user