YARN-9999. TestFSSchedulerConfigurationStore: Extend from ConfigurationStoreBaseTest, general code cleanup. Contributed by Benjamin Teke
This commit is contained in:
parent
497c7a1680
commit
61ca459c74
|
@ -78,13 +78,24 @@ public abstract class ConfigurationStoreBaseTest {
|
||||||
confStore.close();
|
confStore.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
YarnConfigurationStore.LogMutation prepareLogMutation(String key,
|
YarnConfigurationStore.LogMutation prepareLogMutation(String... values)
|
||||||
String value)
|
|
||||||
throws Exception {
|
throws Exception {
|
||||||
Map<String, String> update = new HashMap<>();
|
Map<String, String> updates = new HashMap<>();
|
||||||
update.put(key, value);
|
String key;
|
||||||
|
String value;
|
||||||
|
|
||||||
|
if (values.length % 2 != 0) {
|
||||||
|
throw new IllegalArgumentException("The number of parameters should be " +
|
||||||
|
"even.");
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 1; i <= values.length; i += 2) {
|
||||||
|
key = values[i - 1];
|
||||||
|
value = values[i];
|
||||||
|
updates.put(key, value);
|
||||||
|
}
|
||||||
YarnConfigurationStore.LogMutation mutation =
|
YarnConfigurationStore.LogMutation mutation =
|
||||||
new YarnConfigurationStore.LogMutation(update, TEST_USER);
|
new YarnConfigurationStore.LogMutation(updates, TEST_USER);
|
||||||
confStore.logMutation(mutation);
|
confStore.logMutation(mutation);
|
||||||
|
|
||||||
return mutation;
|
return mutation;
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.LinkedList;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assume.assumeFalse;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base class for the persistent {@link YarnConfigurationStore}
|
* Base class for the persistent {@link YarnConfigurationStore}
|
||||||
|
@ -94,6 +95,9 @@ public abstract class PersistentConfigurationStoreBaseTest extends
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMaxLogs() throws Exception {
|
public void testMaxLogs() throws Exception {
|
||||||
|
assumeFalse("test should be skipped for TestFSSchedulerConfigurationStore",
|
||||||
|
this instanceof TestFSSchedulerConfigurationStore);
|
||||||
|
|
||||||
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
|
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
|
||||||
confStore.initialize(conf, schedConf, rmContext);
|
confStore.initialize(conf, schedConf, rmContext);
|
||||||
LinkedList<YarnConfigurationStore.LogMutation> logs = confStore.getLogs();
|
LinkedList<YarnConfigurationStore.LogMutation> logs = confStore.getLogs();
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
|
@ -31,36 +30,34 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.hamcrest.CoreMatchers;
|
import org.hamcrest.CoreMatchers;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests {@link FSSchedulerConfigurationStore}.
|
* Tests {@link FSSchedulerConfigurationStore}.
|
||||||
*/
|
*/
|
||||||
public class TestFSSchedulerConfigurationStore {
|
public class TestFSSchedulerConfigurationStore extends
|
||||||
private static final String TEST_USER = "test";
|
PersistentConfigurationStoreBaseTest {
|
||||||
private FSSchedulerConfigurationStore configurationStore;
|
|
||||||
private Configuration conf;
|
|
||||||
private File testSchedulerConfigurationDir;
|
private File testSchedulerConfigurationDir;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@Override
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
configurationStore = new FSSchedulerConfigurationStore();
|
super.setUp();
|
||||||
testSchedulerConfigurationDir = new File(
|
testSchedulerConfigurationDir = new File(
|
||||||
TestFSSchedulerConfigurationStore.class.getResource("").getPath()
|
TestFSSchedulerConfigurationStore.class.getResource("").getPath()
|
||||||
+ FSSchedulerConfigurationStore.class.getSimpleName());
|
+ FSSchedulerConfigurationStore.class.getSimpleName());
|
||||||
testSchedulerConfigurationDir.mkdirs();
|
testSchedulerConfigurationDir.mkdirs();
|
||||||
|
|
||||||
conf = new Configuration();
|
|
||||||
conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH,
|
conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH,
|
||||||
testSchedulerConfigurationDir.getAbsolutePath());
|
testSchedulerConfigurationDir.getAbsolutePath());
|
||||||
}
|
}
|
||||||
|
@ -81,34 +78,41 @@ public class TestFSSchedulerConfigurationStore {
|
||||||
FileUtils.deleteDirectory(testSchedulerConfigurationDir);
|
FileUtils.deleteDirectory(testSchedulerConfigurationDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void checkVersion() {
|
||||||
|
try {
|
||||||
|
confStore.checkVersion();
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("checkVersion throw exception");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void confirmMutationWithValid() throws Exception {
|
public void confirmMutationWithValid() throws Exception {
|
||||||
conf.setInt(
|
conf.setInt(
|
||||||
YarnConfiguration.SCHEDULER_CONFIGURATION_FS_MAX_VERSION, 2);
|
YarnConfiguration.SCHEDULER_CONFIGURATION_FS_MAX_VERSION, 2);
|
||||||
conf.set("a", "a");
|
conf.set("a", "a");
|
||||||
conf.set("b", "b");
|
conf.set("b", "b");
|
||||||
conf.set("c", "c");
|
conf.set("c", "c");
|
||||||
writeConf(conf);
|
writeConf(conf);
|
||||||
configurationStore.initialize(conf, conf, null);
|
confStore.initialize(conf, conf, null);
|
||||||
Configuration storeConf = configurationStore.retrieve();
|
Configuration storeConf = confStore.retrieve();
|
||||||
compareConfig(conf, storeConf);
|
compareConfig(conf, storeConf);
|
||||||
|
|
||||||
Configuration expectConfig = new Configuration(conf);
|
Configuration expectConfig = new Configuration(conf);
|
||||||
expectConfig.unset("a");
|
expectConfig.unset("a");
|
||||||
expectConfig.set("b", "bb");
|
expectConfig.set("b", "bb");
|
||||||
|
|
||||||
prepareParameterizedLogMutation(configurationStore, true,
|
confStore.confirmMutation(prepareLogMutation("a", null, "b", "bb"), true);
|
||||||
"a", null, "b", "bb");
|
storeConf = confStore.retrieve();
|
||||||
storeConf = configurationStore.retrieve();
|
|
||||||
assertNull(storeConf.get("a"));
|
assertNull(storeConf.get("a"));
|
||||||
assertEquals("bb", storeConf.get("b"));
|
assertEquals("bb", storeConf.get("b"));
|
||||||
assertEquals("c", storeConf.get("c"));
|
assertEquals("c", storeConf.get("c"));
|
||||||
|
|
||||||
compareConfig(expectConfig, storeConf);
|
compareConfig(expectConfig, storeConf);
|
||||||
|
|
||||||
prepareParameterizedLogMutation(configurationStore, true,
|
confStore.confirmMutation(prepareLogMutation("a", null, "b", "bbb"), true);
|
||||||
"a", null, "b", "bbb");
|
storeConf = confStore.retrieve();
|
||||||
storeConf = configurationStore.retrieve();
|
|
||||||
assertNull(storeConf.get("a"));
|
assertNull(storeConf.get("a"));
|
||||||
assertEquals("bbb", storeConf.get("b"));
|
assertEquals("bbb", storeConf.get("b"));
|
||||||
assertEquals("c", storeConf.get("c"));
|
assertEquals("c", storeConf.get("c"));
|
||||||
|
@ -120,17 +124,51 @@ public class TestFSSchedulerConfigurationStore {
|
||||||
conf.set("b", "b");
|
conf.set("b", "b");
|
||||||
conf.set("c", "c");
|
conf.set("c", "c");
|
||||||
writeConf(conf);
|
writeConf(conf);
|
||||||
configurationStore.initialize(conf, conf, null);
|
confStore.initialize(conf, conf, null);
|
||||||
Configuration storeConf = configurationStore.retrieve();
|
Configuration storeConf = confStore.retrieve();
|
||||||
compareConfig(conf, storeConf);
|
compareConfig(conf, storeConf);
|
||||||
|
|
||||||
prepareParameterizedLogMutation(configurationStore, false,
|
confStore.confirmMutation(prepareLogMutation("a", null, "b", "bb"), false);
|
||||||
"a", null, "b", "bb");
|
storeConf = confStore.retrieve();
|
||||||
storeConf = configurationStore.retrieve();
|
|
||||||
|
|
||||||
compareConfig(conf, storeConf);
|
compareConfig(conf, storeConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConfigRetrieval() throws Exception {
|
||||||
|
Configuration schedulerConf = new Configuration();
|
||||||
|
schedulerConf.set("a", "a");
|
||||||
|
schedulerConf.setLong("long", 1L);
|
||||||
|
schedulerConf.setBoolean("boolean", true);
|
||||||
|
writeConf(schedulerConf);
|
||||||
|
|
||||||
|
confStore.initialize(conf, conf, null);
|
||||||
|
Configuration storedConfig = confStore.retrieve();
|
||||||
|
|
||||||
|
compareConfig(schedulerConf, storedConfig);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFormatConfiguration() throws Exception {
|
||||||
|
Configuration persistedSchedConf = new Configuration();
|
||||||
|
persistedSchedConf.set("a", "a");
|
||||||
|
writeConf(persistedSchedConf);
|
||||||
|
confStore.initialize(conf, conf, null);
|
||||||
|
Configuration storedConfig = confStore.retrieve();
|
||||||
|
assertEquals("Retrieved config should match the stored one", "a",
|
||||||
|
storedConfig.get("a"));
|
||||||
|
confStore.format();
|
||||||
|
try {
|
||||||
|
confStore.retrieve();
|
||||||
|
fail("Expected an IOException with message containing \"no capacity " +
|
||||||
|
"scheduler file in\" to be thrown");
|
||||||
|
} catch (IOException e) {
|
||||||
|
assertThat("Exception message should contain the predefined string.",
|
||||||
|
e.getMessage(),
|
||||||
|
CoreMatchers.containsString("no capacity scheduler file in"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFileSystemClose() throws Exception {
|
public void testFileSystemClose() throws Exception {
|
||||||
MiniDFSCluster hdfsCluster = null;
|
MiniDFSCluster hdfsCluster = null;
|
||||||
|
@ -146,18 +184,15 @@ public class TestFSSchedulerConfigurationStore {
|
||||||
fs.mkdirs(path);
|
fs.mkdirs(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
FSSchedulerConfigurationStore configStore =
|
|
||||||
new FSSchedulerConfigurationStore();
|
|
||||||
hdfsConfig.set(YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH,
|
hdfsConfig.set(YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH,
|
||||||
path.toString());
|
path.toString());
|
||||||
configStore.initialize(hdfsConfig, hdfsConfig, null);
|
confStore.initialize(hdfsConfig, hdfsConfig, null);
|
||||||
|
|
||||||
// Close the FileSystem object and validate
|
// Close the FileSystem object and validate
|
||||||
fs.close();
|
fs.close();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
prepareParameterizedLogMutation(configStore, true,
|
confStore.confirmMutation(prepareLogMutation("key", "val"), true);
|
||||||
"testkey", "testvalue");
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
if (e.getMessage().contains("Filesystem closed")) {
|
if (e.getMessage().contains("Filesystem closed")) {
|
||||||
fail("FSSchedulerConfigurationStore failed to handle " +
|
fail("FSSchedulerConfigurationStore failed to handle " +
|
||||||
|
@ -176,50 +211,8 @@ public class TestFSSchedulerConfigurationStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testFormatConfiguration() throws Exception {
|
|
||||||
Configuration schedulerConf = new Configuration();
|
|
||||||
schedulerConf.set("a", "a");
|
|
||||||
writeConf(schedulerConf);
|
|
||||||
configurationStore.initialize(conf, conf, null);
|
|
||||||
Configuration storedConfig = configurationStore.retrieve();
|
|
||||||
assertEquals("a", storedConfig.get("a"));
|
|
||||||
configurationStore.format();
|
|
||||||
try {
|
|
||||||
configurationStore.retrieve();
|
|
||||||
fail("Expected an IOException with message containing \"no capacity " +
|
|
||||||
"scheduler file in\" to be thrown");
|
|
||||||
} catch (IOException e) {
|
|
||||||
assertThat(e.getMessage(),
|
|
||||||
CoreMatchers.containsString("no capacity scheduler file in"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void retrieve() throws Exception {
|
|
||||||
Configuration schedulerConf = new Configuration();
|
|
||||||
schedulerConf.set("a", "a");
|
|
||||||
schedulerConf.setLong("long", 1L);
|
|
||||||
schedulerConf.setBoolean("boolean", true);
|
|
||||||
writeConf(schedulerConf);
|
|
||||||
|
|
||||||
configurationStore.initialize(conf, conf, null);
|
|
||||||
Configuration storedConfig = configurationStore.retrieve();
|
|
||||||
|
|
||||||
compareConfig(schedulerConf, storedConfig);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void checkVersion() {
|
|
||||||
try {
|
|
||||||
configurationStore.checkVersion();
|
|
||||||
} catch (Exception e) {
|
|
||||||
fail("checkVersion throw exception");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void compareConfig(Configuration schedulerConf,
|
private void compareConfig(Configuration schedulerConf,
|
||||||
Configuration storedConfig) {
|
Configuration storedConfig) {
|
||||||
for (Map.Entry<String, String> entry : schedulerConf) {
|
for (Map.Entry<String, String> entry : schedulerConf) {
|
||||||
assertEquals(entry.getKey(), schedulerConf.get(entry.getKey()),
|
assertEquals(entry.getKey(), schedulerConf.get(entry.getKey()),
|
||||||
storedConfig.get(entry.getKey()));
|
storedConfig.get(entry.getKey()));
|
||||||
|
@ -231,26 +224,13 @@ public class TestFSSchedulerConfigurationStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void prepareParameterizedLogMutation(
|
@Override
|
||||||
FSSchedulerConfigurationStore configStore,
|
public YarnConfigurationStore createConfStore() {
|
||||||
boolean validityFlag, String... values) throws Exception {
|
return new FSSchedulerConfigurationStore();
|
||||||
Map<String, String> updates = new HashMap<>();
|
}
|
||||||
String key;
|
|
||||||
String value;
|
|
||||||
|
|
||||||
if (values.length % 2 != 0) {
|
@Override
|
||||||
throw new IllegalArgumentException("The number of parameters should be " +
|
Version getVersion() {
|
||||||
"even.");
|
return null;
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 1; i <= values.length; i += 2) {
|
|
||||||
key = values[i - 1];
|
|
||||||
value = values[i];
|
|
||||||
updates.put(key, value);
|
|
||||||
}
|
|
||||||
|
|
||||||
LogMutation logMutation = new LogMutation(updates, TEST_USER);
|
|
||||||
configStore.logMutation(logMutation);
|
|
||||||
configStore.confirmMutation(logMutation, validityFlag);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue