YARN-10002. Code cleanup and improvements in ConfigurationStoreBaseTest. Contributed by Benjamin Teke

This commit is contained in:
Szilard Nemeth 2020-03-10 16:35:04 +01:00
parent 9314ef947f
commit 61f4cf3055
10 changed files with 254 additions and 297 deletions

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -336,6 +337,12 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
return null;
}
@Override
protected LinkedList<LogMutation> getLogs() {
// Unimplemented.
return null;
}
@Override
protected Version getConfStoreVersion() throws Exception {
return null;

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -86,6 +87,12 @@ public class InMemoryConfigurationStore extends YarnConfigurationStore {
return null;
}
@Override
protected LinkedList<LogMutation> getLogs() {
// Unimplemented.
return null;
}
@Override
public Version getConfStoreVersion() throws Exception {
// Does nothing.

View File

@ -333,6 +333,7 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
}
@VisibleForTesting
@Override
protected LinkedList<LogMutation> getLogs() throws Exception {
return deserLogMutations(db.get(bytes(LOG_KEY)));
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import java.io.IOException;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -155,6 +156,13 @@ public abstract class YarnConfigurationStore {
*/
protected abstract Version getConfStoreVersion() throws Exception;
/**
* Get a list of configuration mutations.
* @return list of configuration mutations.
* @throws Exception On mutation fetch failure
*/
protected abstract LinkedList<LogMutation> getLogs() throws Exception;
/**
* Persist the hard-coded schema version to the conf store.
* @throws Exception On storage failure

View File

@ -119,6 +119,7 @@ public class ZKConfigurationStore extends YarnConfigurationStore {
}
@VisibleForTesting
@Override
protected LinkedList<LogMutation> getLogs() throws Exception {
return (LinkedList<LogMutation>)
deserializeObject(zkManager.getData(logsPath));

View File

@ -34,16 +34,13 @@ import static org.junit.Assert.assertNull;
* Base class for {@link YarnConfigurationStore} implementations.
*/
public abstract class ConfigurationStoreBaseTest {
static final String TEST_USER = "testUser";
YarnConfigurationStore confStore = createConfStore();
Configuration conf;
Configuration schedConf;
RMContext rmContext;
protected YarnConfigurationStore confStore = createConfStore();
protected abstract YarnConfigurationStore createConfStore();
protected Configuration conf;
protected Configuration schedConf;
protected RMContext rmContext;
protected static final String TEST_USER = "testUser";
abstract YarnConfigurationStore createConfStore();
@Before
public void setUp() throws Exception {
@ -59,20 +56,12 @@ public abstract class ConfigurationStoreBaseTest {
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val1", confStore.retrieve().get("key1"));
Map<String, String> update1 = new HashMap<>();
update1.put("keyUpdate1", "valUpdate1");
YarnConfigurationStore.LogMutation mutation1 =
new YarnConfigurationStore.LogMutation(update1, TEST_USER);
confStore.logMutation(mutation1);
confStore.confirmMutation(mutation1, true);
confStore.confirmMutation(prepareLogMutation("keyUpdate1", "valUpdate1"),
true);
assertEquals("valUpdate1", confStore.retrieve().get("keyUpdate1"));
Map<String, String> update2 = new HashMap<>();
update2.put("keyUpdate2", "valUpdate2");
YarnConfigurationStore.LogMutation mutation2 =
new YarnConfigurationStore.LogMutation(update2, TEST_USER);
confStore.logMutation(mutation2);
confStore.confirmMutation(mutation2, false);
confStore.confirmMutation(prepareLogMutation("keyUpdate2", "valUpdate2"),
false);
assertNull("Configuration should not be updated",
confStore.retrieve().get("keyUpdate2"));
confStore.close();
@ -84,13 +73,20 @@ public abstract class ConfigurationStoreBaseTest {
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
Map<String, String> update = new HashMap<>();
update.put("key", null);
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
confStore.confirmMutation(mutation, true);
confStore.confirmMutation(prepareLogMutation("key", null), true);
assertNull(confStore.retrieve().get("key"));
confStore.close();
}
YarnConfigurationStore.LogMutation prepareLogMutation(String key,
String value)
throws Exception {
Map<String, String> update = new HashMap<>();
update.put(key, value);
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
return mutation;
}
}

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version;
import org.junit.Test;
import java.util.LinkedList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
/**
* Base class for the persistent {@link YarnConfigurationStore}
* implementations, namely {@link TestLeveldbConfigurationStore} and
* {@link TestZKConfigurationStore}.
*/
public abstract class PersistentConfigurationStoreBaseTest extends
ConfigurationStoreBaseTest {
abstract Version getVersion();
@Test
public void testGetConfigurationVersion() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
long v1 = confStore.getConfigVersion();
assertEquals(1, v1);
confStore.confirmMutation(prepareLogMutation("keyver", "valver"), true);
long v2 = confStore.getConfigVersion();
assertEquals(2, v2);
confStore.close();
}
@Test
public void testPersistConfiguration() throws Exception {
schedConf.set("key", "val");
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
confStore.close();
// Create a new configuration store, and check for old configuration
confStore = createConfStore();
schedConf.set("key", "badVal");
// Should ignore passed-in scheduler configuration.
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
confStore.close();
}
@Test
public void testPersistUpdatedConfiguration() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
assertNull(confStore.retrieve().get("key"));
confStore.confirmMutation(prepareLogMutation("key", "val"), true);
assertEquals("val", confStore.retrieve().get("key"));
confStore.close();
// Create a new configuration store, and check for updated configuration
confStore = createConfStore();
schedConf.set("key", "badVal");
// Should ignore passed-in scheduler configuration.
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
confStore.close();
}
@Test
public void testVersion() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
assertNull(confStore.getConfStoreVersion());
confStore.checkVersion();
assertEquals(getVersion(),
confStore.getConfStoreVersion());
confStore.close();
}
@Test
public void testMaxLogs() throws Exception {
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
confStore.initialize(conf, schedConf, rmContext);
LinkedList<YarnConfigurationStore.LogMutation> logs = confStore.getLogs();
assertEquals(0, logs.size());
YarnConfigurationStore.LogMutation mutation =
prepareLogMutation("key1", "val1");
logs = confStore.getLogs();
assertEquals(1, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
confStore.confirmMutation(mutation, true);
assertEquals(1, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
mutation = prepareLogMutation("key2", "val2");
logs = confStore.getLogs();
assertEquals(2, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
assertEquals("val2", logs.get(1).getUpdates().get("key2"));
confStore.confirmMutation(mutation, true);
assertEquals(2, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
assertEquals("val2", logs.get(1).getUpdates().get("key2"));
// Next update should purge first update from logs.
mutation = prepareLogMutation("key3", "val3");
logs = confStore.getLogs();
assertEquals(2, logs.size());
assertEquals("val2", logs.get(0).getUpdates().get("key2"));
assertEquals("val3", logs.get(1).getUpdates().get("key3"));
confStore.confirmMutation(mutation, true);
assertEquals(2, logs.size());
assertEquals("val2", logs.get(0).getUpdates().get("key2"));
assertEquals("val3", logs.get(1).getUpdates().get("key3"));
confStore.close();
}
}

View File

@ -32,19 +32,22 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
/**
* Tests {@link FSSchedulerConfigurationStore}.
*/
public class TestFSSchedulerConfigurationStore {
private static final String TEST_USER = "test";
private FSSchedulerConfigurationStore configurationStore;
private Configuration conf;
private File testSchedulerConfigurationDir;
@ -90,35 +93,29 @@ public class TestFSSchedulerConfigurationStore {
Configuration storeConf = configurationStore.retrieve();
compareConfig(conf, storeConf);
Map<String, String> updates = new HashMap<>();
updates.put("a", null);
updates.put("b", "bb");
Configuration expectConfig = new Configuration(conf);
expectConfig.unset("a");
expectConfig.set("b", "bb");
LogMutation logMutation = new LogMutation(updates, "test");
configurationStore.logMutation(logMutation);
configurationStore.confirmMutation(logMutation, true);
prepareParameterizedLogMutation(configurationStore, true,
"a", null, "b", "bb");
storeConf = configurationStore.retrieve();
assertEquals(null, storeConf.get("a"));
assertNull(storeConf.get("a"));
assertEquals("bb", storeConf.get("b"));
assertEquals("c", storeConf.get("c"));
compareConfig(expectConfig, storeConf);
updates.put("b", "bbb");
configurationStore.logMutation(logMutation);
configurationStore.confirmMutation(logMutation, true);
prepareParameterizedLogMutation(configurationStore, true,
"a", null, "b", "bbb");
storeConf = configurationStore.retrieve();
assertEquals(null, storeConf.get("a"));
assertNull(storeConf.get("a"));
assertEquals("bbb", storeConf.get("b"));
assertEquals("c", storeConf.get("c"));
}
@Test
public void confirmMutationWithInValid() throws Exception {
public void confirmMutationWithInvalid() throws Exception {
conf.set("a", "a");
conf.set("b", "b");
conf.set("c", "c");
@ -127,13 +124,8 @@ public class TestFSSchedulerConfigurationStore {
Configuration storeConf = configurationStore.retrieve();
compareConfig(conf, storeConf);
Map<String, String> updates = new HashMap<>();
updates.put("a", null);
updates.put("b", "bb");
LogMutation logMutation = new LogMutation(updates, "test");
configurationStore.logMutation(logMutation);
configurationStore.confirmMutation(logMutation, false);
prepareParameterizedLogMutation(configurationStore, false,
"a", null, "b", "bb");
storeConf = configurationStore.retrieve();
compareConfig(conf, storeConf);
@ -142,7 +134,7 @@ public class TestFSSchedulerConfigurationStore {
@Test
public void testFileSystemClose() throws Exception {
MiniDFSCluster hdfsCluster = null;
FileSystem fs = null;
FileSystem fs;
Path path = new Path("/tmp/confstore");
try {
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
@ -164,11 +156,8 @@ public class TestFSSchedulerConfigurationStore {
fs.close();
try {
Map<String, String> updates = new HashMap<>();
updates.put("testkey", "testvalue");
LogMutation logMutation = new LogMutation(updates, "test");
configStore.logMutation(logMutation);
configStore.confirmMutation(logMutation, true);
prepareParameterizedLogMutation(configStore, true,
"testkey", "testvalue");
} catch (IOException e) {
if (e.getMessage().contains("Filesystem closed")) {
fail("FSSchedulerConfigurationStore failed to handle " +
@ -178,13 +167,12 @@ public class TestFSSchedulerConfigurationStore {
}
}
} finally {
assert hdfsCluster != null;
fs = hdfsCluster.getFileSystem();
if (fs.exists(path)) {
fs.delete(path, true);
}
if (hdfsCluster != null) {
hdfsCluster.shutdown();
}
hdfsCluster.shutdown();
}
}
@ -197,15 +185,14 @@ public class TestFSSchedulerConfigurationStore {
Configuration storedConfig = configurationStore.retrieve();
assertEquals("a", storedConfig.get("a"));
configurationStore.format();
boolean exceptionCaught = false;
try {
storedConfig = configurationStore.retrieve();
configurationStore.retrieve();
fail("Expected an IOException with message containing \"no capacity " +
"scheduler file in\" to be thrown");
} catch (IOException e) {
if (e.getMessage().contains("no capacity scheduler file in")) {
exceptionCaught = true;
}
assertThat(e.getMessage(),
CoreMatchers.containsString("no capacity scheduler file in"));
}
assertTrue(exceptionCaught);
}
@Test
@ -243,4 +230,27 @@ public class TestFSSchedulerConfigurationStore {
schedulerConf.get(entry.getKey()));
}
}
private void prepareParameterizedLogMutation(
FSSchedulerConfigurationStore configStore,
boolean validityFlag, String... values) throws Exception {
Map<String, String> updates = new HashMap<>();
String key;
String value;
if (values.length % 2 != 0) {
throw new IllegalArgumentException("The number of parameters should be " +
"even.");
}
for (int i = 1; i <= values.length; i += 2) {
key = values[i - 1];
value = values[i];
updates.put(key, value);
}
LogMutation logMutation = new LogMutation(updates, TEST_USER);
configStore.logMutation(logMutation);
configStore.confirmMutation(logMutation, validityFlag);
}
}

View File

@ -38,8 +38,6 @@ import org.iq80.leveldb.DBIterator;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@ -49,7 +47,8 @@ import static org.junit.Assert.assertNull;
/**
* Tests {@link LeveldbConfigurationStore}.
*/
public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
public class TestLeveldbConfigurationStore extends
PersistentConfigurationStoreBaseTest {
public static final Logger LOG =
LoggerFactory.getLogger(TestLeveldbConfigurationStore.class);
@ -58,8 +57,6 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
System.getProperty("java.io.tmpdir")),
TestLeveldbConfigurationStore.class.getName());
private ResourceManager rm;
@Before
public void setUp() throws Exception {
super.setUp();
@ -69,16 +66,6 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
conf.set(YarnConfiguration.RM_SCHEDCONF_STORE_PATH, TEST_DIR.toString());
}
@Test
public void testVersioning() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
assertNull(confStore.getConfStoreVersion());
confStore.checkVersion();
assertEquals(LeveldbConfigurationStore.CURRENT_VERSION_INFO,
confStore.getConfStoreVersion());
confStore.close();
}
@Test(expected = YarnConfStoreVersionIncompatibleException.class)
public void testIncompatibleVersion() throws Exception {
try {
@ -96,55 +83,12 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
}
}
@Test
public void testPersistConfiguration() throws Exception {
schedConf.set("key", "val");
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
confStore.close();
// Create a new configuration store, and check for old configuration
confStore = createConfStore();
schedConf.set("key", "badVal");
// Should ignore passed-in scheduler configuration.
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
confStore.close();
}
@Test
public void testPersistUpdatedConfiguration() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
assertNull(confStore.retrieve().get("key"));
Map<String, String> update = new HashMap<>();
update.put("key", "val");
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
confStore.confirmMutation(mutation, true);
assertEquals("val", confStore.retrieve().get("key"));
confStore.close();
// Create a new configuration store, and check for updated configuration
confStore = createConfStore();
schedConf.set("key", "badVal");
// Should ignore passed-in scheduler configuration.
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
confStore.close();
}
@Test
public void testDisableAuditLogs() throws Exception {
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0);
confStore.initialize(conf, schedConf, rmContext);
Map<String, String> update = new HashMap<>();
update.put("key1", "val1");
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
prepareLogMutation("key1", "val1");
boolean logKeyPresent = false;
DB db = ((LeveldbConfigurationStore) confStore).getDB();
@ -162,55 +106,6 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
confStore.close();
}
@Test
public void testMaxLogs() throws Exception {
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
confStore.initialize(conf, schedConf, rmContext);
LinkedList<YarnConfigurationStore.LogMutation> logs =
((LeveldbConfigurationStore) confStore).getLogs();
assertEquals(0, logs.size());
Map<String, String> update1 = new HashMap<>();
update1.put("key1", "val1");
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update1, TEST_USER);
confStore.logMutation(mutation);
logs = ((LeveldbConfigurationStore) confStore).getLogs();
assertEquals(1, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
confStore.confirmMutation(mutation, true);
assertEquals(1, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
Map<String, String> update2 = new HashMap<>();
update2.put("key2", "val2");
mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER);
confStore.logMutation(mutation);
logs = ((LeveldbConfigurationStore) confStore).getLogs();
assertEquals(2, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
assertEquals("val2", logs.get(1).getUpdates().get("key2"));
confStore.confirmMutation(mutation, true);
assertEquals(2, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
assertEquals("val2", logs.get(1).getUpdates().get("key2"));
// Next update should purge first update from logs.
Map<String, String> update3 = new HashMap<>();
update3.put("key3", "val3");
mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER);
confStore.logMutation(mutation);
logs = ((LeveldbConfigurationStore) confStore).getLogs();
assertEquals(2, logs.size());
assertEquals("val2", logs.get(0).getUpdates().get("key2"));
assertEquals("val3", logs.get(1).getUpdates().get("key3"));
confStore.confirmMutation(mutation, true);
assertEquals(2, logs.size());
assertEquals("val2", logs.get(0).getUpdates().get("key2"));
assertEquals("val3", logs.get(1).getUpdates().get("key3"));
confStore.close();
}
/**
* When restarting, RM should read from current state of store, including
* any updates from the previous RM instance.
@ -240,7 +135,7 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
.getConfStore().retrieve().get("key"));
// Next update is not persisted, it should not be recovered
schedConfUpdateInfo.getGlobalParams().put("key", "badVal");
log = confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
confProvider.logAndApplyMutation(user, schedConfUpdateInfo);
rm1.close();
// Start RM2 and verifies it starts with updated configuration
@ -258,4 +153,10 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
public YarnConfigurationStore createConfStore() {
return new LeveldbConfigurationStore();
}
@Override
Version getVersion() {
return LeveldbConfigurationStore.CURRENT_VERSION_INFO;
}
}

View File

@ -18,13 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
@ -34,8 +27,11 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
@ -45,14 +41,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.ACL;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -63,8 +62,8 @@ import static org.junit.Assert.assertTrue;
/**
* Tests {@link ZKConfigurationStore}.
*/
public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
public class TestZKConfigurationStore extends
PersistentConfigurationStoreBaseTest {
public static final Logger LOG =
LoggerFactory.getLogger(TestZKConfigurationStore.class);
@ -90,6 +89,7 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
curatorTestingServer = setupCuratorServer();
@ -109,15 +109,6 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
curatorTestingServer.stop();
}
@Test
public void testVersioning() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
assertNull(confStore.getConfStoreVersion());
confStore.checkVersion();
assertEquals(ZKConfigurationStore.CURRENT_VERSION_INFO,
confStore.getConfStoreVersion());
}
@Test(expected = YarnConfStoreVersionIncompatibleException.class)
public void testIncompatibleVersion() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
@ -141,23 +132,6 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
confStore.checkVersion();
}
@Test
public void testPersistConfiguration() throws Exception {
schedConf.set("key", "val");
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
assertNull(confStore.retrieve().get(YarnConfiguration.RM_HOSTNAME));
// Create a new configuration store, and check for old configuration
confStore = createConfStore();
schedConf.set("key", "badVal");
// Should ignore passed-in scheduler configuration.
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
}
@Test
public void testFormatConfiguration() throws Exception {
schedConf.set("key", "val");
@ -167,90 +141,6 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
assertNull(confStore.retrieve());
}
@Test
public void testGetConfigurationVersion() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
long v1 = confStore.getConfigVersion();
assertEquals(1, v1);
Map<String, String> update = new HashMap<>();
update.put("keyver", "valver");
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
confStore.confirmMutation(mutation, true);
long v2 = confStore.getConfigVersion();
assertEquals(2, v2);
}
@Test
public void testPersistUpdatedConfiguration() throws Exception {
confStore.initialize(conf, schedConf, rmContext);
assertNull(confStore.retrieve().get("key"));
Map<String, String> update = new HashMap<>();
update.put("key", "val");
LogMutation mutation = new LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
confStore.confirmMutation(mutation, true);
assertEquals("val", confStore.retrieve().get("key"));
// Create a new configuration store, and check for updated configuration
confStore = createConfStore();
schedConf.set("key", "badVal");
// Should ignore passed-in scheduler configuration.
confStore.initialize(conf, schedConf, rmContext);
assertEquals("val", confStore.retrieve().get("key"));
}
@Test
public void testMaxLogs() throws Exception {
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
confStore.initialize(conf, schedConf, rmContext);
LinkedList<YarnConfigurationStore.LogMutation> logs =
((ZKConfigurationStore) confStore).getLogs();
assertEquals(0, logs.size());
Map<String, String> update1 = new HashMap<>();
update1.put("key1", "val1");
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update1, TEST_USER);
confStore.logMutation(mutation);
logs = ((ZKConfigurationStore) confStore).getLogs();
assertEquals(1, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
confStore.confirmMutation(mutation, true);
assertEquals(1, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
Map<String, String> update2 = new HashMap<>();
update2.put("key2", "val2");
mutation = new YarnConfigurationStore.LogMutation(update2, TEST_USER);
confStore.logMutation(mutation);
logs = ((ZKConfigurationStore) confStore).getLogs();
assertEquals(2, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
assertEquals("val2", logs.get(1).getUpdates().get("key2"));
confStore.confirmMutation(mutation, true);
assertEquals(2, logs.size());
assertEquals("val1", logs.get(0).getUpdates().get("key1"));
assertEquals("val2", logs.get(1).getUpdates().get("key2"));
// Next update should purge first update from logs.
Map<String, String> update3 = new HashMap<>();
update3.put("key3", "val3");
mutation = new YarnConfigurationStore.LogMutation(update3, TEST_USER);
confStore.logMutation(mutation);
logs = ((ZKConfigurationStore) confStore).getLogs();
assertEquals(2, logs.size());
assertEquals("val2", logs.get(0).getUpdates().get("key2"));
assertEquals("val3", logs.get(1).getUpdates().get("key3"));
confStore.confirmMutation(mutation, true);
assertEquals(2, logs.size());
assertEquals("val2", logs.get(0).getUpdates().get("key2"));
assertEquals("val3", logs.get(1).getUpdates().get("key3"));
}
@Test
public void testDisableAuditLogs() throws Exception {
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 0);
@ -262,11 +152,7 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
byte[] data = null;
((ZKConfigurationStore) confStore).zkManager.setData(logsPath, data, -1);
Map<String, String> update = new HashMap<>();
update.put("key1", "val1");
YarnConfigurationStore.LogMutation mutation =
new YarnConfigurationStore.LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
prepareLogMutation("key1", "val1");
data = ((ZKConfigurationStore) confStore).zkManager.getData(logsPath);
assertNull("Failed to Disable Audit Logs", data);
@ -489,4 +375,9 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
public YarnConfigurationStore createConfStore() {
return new ZKConfigurationStore();
}
@Override
Version getVersion() {
return ZKConfigurationStore.CURRENT_VERSION_INFO;
}
}