YARN-5948. Implement MutableConfigurationManager for handling storage into configuration store

This commit is contained in:
Jonathan Hung 2017-03-01 16:03:01 -08:00
parent c4c2feeccf
commit 28754646e5
9 changed files with 291 additions and 5 deletions

View File

@ -645,6 +645,12 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS =
"org.apache.hadoop.yarn.LocalConfigurationProvider";
public static final String SCHEDULER_CONFIGURATION_STORE_CLASS =
YARN_PREFIX + "scheduler.configuration.store.class";
public static final String MEMORY_CONFIGURATION_STORE = "memory";
public static final String DEFAULT_CONFIGURATION_STORE =
MEMORY_CONFIGURATION_STORE;
public static final String YARN_AUTHORIZATION_PROVIDER = YARN_PREFIX
+ "authorization-provider";
private static final List<String> RM_SERVICES_ADDRESS_CONF_KEYS_HTTP =

View File

@ -3340,4 +3340,16 @@
</description>
</property>
<property>
<description>
The type of configuration store to use for storing scheduler
configurations, if using a mutable configuration provider.
Keywords such as "memory" map to certain configuration store
implementations. If keyword is not found, try to load this
value as a class.
</description>
<name>yarn.scheduler.configuration.store.class</name>
<value>memory</value>
</property>
</configuration>

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.Map;
/**
* Interface for allowing changing scheduler configurations.
*/
public interface MutableConfigurationProvider {
/**
* Update the scheduler configuration with the provided key value pairs.
* @param user User issuing the request
* @param confUpdate Key-value pairs for configurations to be updated.
*/
void mutateConfiguration(String user, Map<String, String> confUpdate);
}

View File

@ -104,6 +104,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Activi
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.CSConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.FileBasedCSConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.MutableCSConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
@ -295,10 +296,15 @@ public class CapacityScheduler extends
String confProviderStr = configuration.get(
CapacitySchedulerConfiguration.CS_CONF_PROVIDER,
CapacitySchedulerConfiguration.DEFAULT_CS_CONF_PROVIDER);
if (confProviderStr.equals(
CapacitySchedulerConfiguration.FILE_CS_CONF_PROVIDER)) {
this.csConfProvider = new FileBasedCSConfigurationProvider(rmContext);
} else {
switch (confProviderStr) {
case CapacitySchedulerConfiguration.FILE_CS_CONF_PROVIDER:
this.csConfProvider =
new FileBasedCSConfigurationProvider(rmContext);
break;
case CapacitySchedulerConfiguration.STORE_CS_CONF_PROVIDER:
this.csConfProvider = new MutableCSConfigurationProvider(rmContext);
break;
default:
throw new IOException("Invalid CS configuration provider: " +
confProviderStr);
}

View File

@ -322,6 +322,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final String FILE_CS_CONF_PROVIDER = "file";
@Private
public static final String STORE_CS_CONF_PROVIDER = "store";
@Private
public static final String DEFAULT_CS_CONF_PROVIDER = FILE_CS_CONF_PROVIDER;

View File

@ -32,8 +32,9 @@ public interface CSConfigurationProvider {
/**
* Initialize the configuration provider with given conf.
* @param conf configuration to initialize with
* @throws IOException if initialization fails due to misconfiguration
*/
void init(Configuration conf);
void init(Configuration conf) throws IOException;
/**
* Loads capacity scheduler configuration object.

View File

@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import java.io.IOException;
import java.util.Map;
/**
* CS configuration provider which implements
* {@link MutableConfigurationProvider} for modifying capacity scheduler
* configuration.
*/
public class MutableCSConfigurationProvider implements CSConfigurationProvider,
MutableConfigurationProvider {
private Configuration schedConf;
private YarnConfigurationStore confStore;
private RMContext rmContext;
private Configuration conf;
public MutableCSConfigurationProvider(RMContext rmContext) {
this.rmContext = rmContext;
}
@Override
public void init(Configuration config) throws IOException {
String store = config.get(
YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
YarnConfiguration.DEFAULT_CONFIGURATION_STORE);
switch (store) {
case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
this.confStore = new InMemoryConfigurationStore();
break;
default:
this.confStore = YarnConfigurationStoreFactory.getStore(config);
break;
}
Configuration initialSchedConf = new Configuration(false);
initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE);
this.schedConf = initialSchedConf;
confStore.initialize(config, initialSchedConf);
this.conf = config;
}
@Override
public CapacitySchedulerConfiguration loadConfiguration(Configuration
configuration) throws IOException {
Configuration loadedConf = new Configuration(configuration);
loadedConf.addResource(schedConf);
return new CapacitySchedulerConfiguration(loadedConf, false);
}
@Override
public void mutateConfiguration(String user,
Map<String, String> confUpdate) {
Configuration oldConf = new Configuration(schedConf);
LogMutation log = new LogMutation(confUpdate, user);
long id = confStore.logMutation(log);
for (Map.Entry<String, String> kv : confUpdate.entrySet()) {
schedConf.set(kv.getKey(), kv.getValue());
}
try {
rmContext.getScheduler().reinitialize(conf, rmContext);
} catch (IOException e) {
schedConf = oldConf;
confStore.confirmMutation(id, false);
return;
}
confStore.confirmMutation(id, true);
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* Factory class for creating instances of {@link YarnConfigurationStore}.
*/
public final class YarnConfigurationStoreFactory {
private static final Log LOG = LogFactory.getLog(
YarnConfigurationStoreFactory.class);
private YarnConfigurationStoreFactory() {
// Unused.
}
public static YarnConfigurationStore getStore(Configuration conf) {
Class<? extends YarnConfigurationStore> storeClass =
conf.getClass(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
InMemoryConfigurationStore.class, YarnConfigurationStore.class);
LOG.info("Using YarnConfigurationStore implementation - " + storeClass);
return ReflectionUtils.newInstance(storeClass, conf);
}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests {@link MutableCSConfigurationProvider}.
*/
public class TestMutableCSConfigurationProvider {
private MutableCSConfigurationProvider confProvider;
private RMContext rmContext;
private Map<String, String> goodUpdate;
private Map<String, String> badUpdate;
private CapacityScheduler cs;
private static final String TEST_USER = "testUser";
@Before
public void setUp() {
cs = mock(CapacityScheduler.class);
rmContext = mock(RMContext.class);
when(rmContext.getScheduler()).thenReturn(cs);
confProvider = new MutableCSConfigurationProvider(rmContext);
goodUpdate = new HashMap<>();
goodUpdate.put("goodKey", "goodVal");
badUpdate = new HashMap<>();
badUpdate.put("badKey", "badVal");
}
@Test
public void testInMemoryBackedProvider() throws IOException {
Configuration conf = new Configuration();
confProvider.init(conf);
assertNull(confProvider.loadConfiguration(conf)
.get("goodKey"));
doNothing().when(cs).reinitialize(any(Configuration.class),
any(RMContext.class));
confProvider.mutateConfiguration(TEST_USER, goodUpdate);
assertEquals("goodVal", confProvider.loadConfiguration(conf)
.get("goodKey"));
assertNull(confProvider.loadConfiguration(conf).get("badKey"));
doThrow(new IOException()).when(cs).reinitialize(any(Configuration.class),
any(RMContext.class));
confProvider.mutateConfiguration(TEST_USER, badUpdate);
assertNull(confProvider.loadConfiguration(conf).get("badKey"));
}
}