From 28754646e5ea55eae3015306500bde0e7bcac601 Mon Sep 17 00:00:00 2001 From: Jonathan Hung Date: Wed, 1 Mar 2017 16:03:01 -0800 Subject: [PATCH] YARN-5948. Implement MutableConfigurationManager for handling storage into configuration store --- .../hadoop/yarn/conf/YarnConfiguration.java | 6 ++ .../src/main/resources/yarn-default.xml | 12 +++ .../MutableConfigurationProvider.java | 35 +++++++ .../scheduler/capacity/CapacityScheduler.java | 14 ++- .../CapacitySchedulerConfiguration.java | 3 + .../conf/CSConfigurationProvider.java | 3 +- .../conf/MutableCSConfigurationProvider.java | 94 +++++++++++++++++++ .../conf/YarnConfigurationStoreFactory.java | 46 +++++++++ .../TestMutableCSConfigurationProvider.java | 83 ++++++++++++++++ 9 files changed, 291 insertions(+), 5 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStoreFactory.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 8988f642426..d6e657aaefe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -645,6 +645,12 @@ public class YarnConfiguration extends Configuration { public static final String DEFAULT_RM_CONFIGURATION_PROVIDER_CLASS = "org.apache.hadoop.yarn.LocalConfigurationProvider"; + public static final String SCHEDULER_CONFIGURATION_STORE_CLASS = + YARN_PREFIX + "scheduler.configuration.store.class"; + public static final String MEMORY_CONFIGURATION_STORE = "memory"; + public static final String DEFAULT_CONFIGURATION_STORE = + MEMORY_CONFIGURATION_STORE; + public static final String YARN_AUTHORIZATION_PROVIDER = YARN_PREFIX + "authorization-provider"; private static final List RM_SERVICES_ADDRESS_CONF_KEYS_HTTP = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 1c04e206211..247e87561cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3340,4 +3340,16 @@ + + + The type of configuration store to use for storing scheduler + configurations, if using a mutable configuration provider. + Keywords such as "memory" map to certain configuration store + implementations. If keyword is not found, try to load this + value as a class. + + yarn.scheduler.configuration.store.class + memory + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java new file mode 100644 index 00000000000..da30a2bf88a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.util.Map; + +/** + * Interface for allowing changing scheduler configurations. + */ +public interface MutableConfigurationProvider { + + /** + * Update the scheduler configuration with the provided key value pairs. + * @param user User issuing the request + * @param confUpdate Key-value pairs for configurations to be updated. + */ + void mutateConfiguration(String user, Map confUpdate); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 016d7eed5e4..ab602d11553 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -104,6 +104,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Activi import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.CSConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.FileBasedCSConfigurationProvider; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.MutableCSConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; @@ -295,10 +296,15 @@ public class CapacityScheduler extends String confProviderStr = configuration.get( CapacitySchedulerConfiguration.CS_CONF_PROVIDER, CapacitySchedulerConfiguration.DEFAULT_CS_CONF_PROVIDER); - if (confProviderStr.equals( - CapacitySchedulerConfiguration.FILE_CS_CONF_PROVIDER)) { - this.csConfProvider = new FileBasedCSConfigurationProvider(rmContext); - } else { + switch (confProviderStr) { + case CapacitySchedulerConfiguration.FILE_CS_CONF_PROVIDER: + this.csConfProvider = + new FileBasedCSConfigurationProvider(rmContext); + break; + case CapacitySchedulerConfiguration.STORE_CS_CONF_PROVIDER: + this.csConfProvider = new MutableCSConfigurationProvider(rmContext); + break; + default: throw new IOException("Invalid CS configuration provider: " + confProviderStr); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 3821e24a74e..40cb893b6d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -322,6 +322,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final String FILE_CS_CONF_PROVIDER = "file"; + @Private + public static final String STORE_CS_CONF_PROVIDER = "store"; + @Private public static final String DEFAULT_CS_CONF_PROVIDER = FILE_CS_CONF_PROVIDER; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/CSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/CSConfigurationProvider.java index c9984ac7378..0d2c8bb0040 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/CSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/CSConfigurationProvider.java @@ -32,8 +32,9 @@ public interface CSConfigurationProvider { /** * Initialize the configuration provider with given conf. * @param conf configuration to initialize with + * @throws IOException if initialization fails due to misconfiguration */ - void init(Configuration conf); + void init(Configuration conf) throws IOException; /** * Loads capacity scheduler configuration object. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java new file mode 100644 index 00000000000..267ab6a35cc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; + +import java.io.IOException; +import java.util.Map; + +/** + * CS configuration provider which implements + * {@link MutableConfigurationProvider} for modifying capacity scheduler + * configuration. + */ +public class MutableCSConfigurationProvider implements CSConfigurationProvider, + MutableConfigurationProvider { + + private Configuration schedConf; + private YarnConfigurationStore confStore; + private RMContext rmContext; + private Configuration conf; + + public MutableCSConfigurationProvider(RMContext rmContext) { + this.rmContext = rmContext; + } + + @Override + public void init(Configuration config) throws IOException { + String store = config.get( + YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, + YarnConfiguration.DEFAULT_CONFIGURATION_STORE); + switch (store) { + case YarnConfiguration.MEMORY_CONFIGURATION_STORE: + this.confStore = new InMemoryConfigurationStore(); + break; + default: + this.confStore = YarnConfigurationStoreFactory.getStore(config); + break; + } + Configuration initialSchedConf = new Configuration(false); + initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE); + this.schedConf = initialSchedConf; + confStore.initialize(config, initialSchedConf); + this.conf = config; + } + + @Override + public CapacitySchedulerConfiguration loadConfiguration(Configuration + configuration) throws IOException { + Configuration loadedConf = new Configuration(configuration); + loadedConf.addResource(schedConf); + return new CapacitySchedulerConfiguration(loadedConf, false); + } + + @Override + public void mutateConfiguration(String user, + Map confUpdate) { + Configuration oldConf = new Configuration(schedConf); + LogMutation log = new LogMutation(confUpdate, user); + long id = confStore.logMutation(log); + for (Map.Entry kv : confUpdate.entrySet()) { + schedConf.set(kv.getKey(), kv.getValue()); + } + try { + rmContext.getScheduler().reinitialize(conf, rmContext); + } catch (IOException e) { + schedConf = oldConf; + confStore.confirmMutation(id, false); + return; + } + confStore.confirmMutation(id, true); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStoreFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStoreFactory.java new file mode 100644 index 00000000000..60249c849f9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStoreFactory.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * Factory class for creating instances of {@link YarnConfigurationStore}. + */ +public final class YarnConfigurationStoreFactory { + + private static final Log LOG = LogFactory.getLog( + YarnConfigurationStoreFactory.class); + + private YarnConfigurationStoreFactory() { + // Unused. + } + + public static YarnConfigurationStore getStore(Configuration conf) { + Class storeClass = + conf.getClass(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, + InMemoryConfigurationStore.class, YarnConfigurationStore.class); + LOG.info("Using YarnConfigurationStore implementation - " + storeClass); + return ReflectionUtils.newInstance(storeClass, conf); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java new file mode 100644 index 00000000000..3f103b1bd87 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests {@link MutableCSConfigurationProvider}. + */ +public class TestMutableCSConfigurationProvider { + + private MutableCSConfigurationProvider confProvider; + private RMContext rmContext; + private Map goodUpdate; + private Map badUpdate; + private CapacityScheduler cs; + + private static final String TEST_USER = "testUser"; + + @Before + public void setUp() { + cs = mock(CapacityScheduler.class); + rmContext = mock(RMContext.class); + when(rmContext.getScheduler()).thenReturn(cs); + confProvider = new MutableCSConfigurationProvider(rmContext); + goodUpdate = new HashMap<>(); + goodUpdate.put("goodKey", "goodVal"); + badUpdate = new HashMap<>(); + badUpdate.put("badKey", "badVal"); + } + + @Test + public void testInMemoryBackedProvider() throws IOException { + Configuration conf = new Configuration(); + confProvider.init(conf); + assertNull(confProvider.loadConfiguration(conf) + .get("goodKey")); + + doNothing().when(cs).reinitialize(any(Configuration.class), + any(RMContext.class)); + confProvider.mutateConfiguration(TEST_USER, goodUpdate); + assertEquals("goodVal", confProvider.loadConfiguration(conf) + .get("goodKey")); + + assertNull(confProvider.loadConfiguration(conf).get("badKey")); + doThrow(new IOException()).when(cs).reinitialize(any(Configuration.class), + any(RMContext.class)); + confProvider.mutateConfiguration(TEST_USER, badUpdate); + assertNull(confProvider.loadConfiguration(conf).get("badKey")); + } +}