YARN-9875. Improve fair scheduler configuration store on HDFS.
Contributed by Prabhu Joseph
This commit is contained in:
parent
72b1bed998
commit
155864da00
|
@ -66,7 +66,7 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
|
||||||
private Path configVersionFile;
|
private Path configVersionFile;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize(Configuration conf, Configuration vSchedConf,
|
public void initialize(Configuration fsConf, Configuration vSchedConf,
|
||||||
RMContext rmContext) throws Exception {
|
RMContext rmContext) throws Exception {
|
||||||
this.configFilePathFilter = new PathFilter() {
|
this.configFilePathFilter = new PathFilter() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -80,6 +80,7 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Configuration conf = new Configuration(fsConf);
|
||||||
String schedulerConfPathStr = conf.get(
|
String schedulerConfPathStr = conf.get(
|
||||||
YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH);
|
YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH);
|
||||||
if (schedulerConfPathStr == null || schedulerConfPathStr.isEmpty()) {
|
if (schedulerConfPathStr == null || schedulerConfPathStr.isEmpty()) {
|
||||||
|
@ -88,6 +89,15 @@ public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
|
||||||
+ " must be set");
|
+ " must be set");
|
||||||
}
|
}
|
||||||
this.schedulerConfDir = new Path(schedulerConfPathStr);
|
this.schedulerConfDir = new Path(schedulerConfPathStr);
|
||||||
|
String scheme = schedulerConfDir.toUri().getScheme();
|
||||||
|
if (scheme == null) {
|
||||||
|
scheme = FileSystem.getDefaultUri(conf).getScheme();
|
||||||
|
}
|
||||||
|
if (scheme != null) {
|
||||||
|
String disableCacheName = String.format("fs.%s.impl.disable.cache",
|
||||||
|
scheme);
|
||||||
|
conf.setBoolean(disableCacheName, true);
|
||||||
|
}
|
||||||
this.fileSystem = this.schedulerConfDir.getFileSystem(conf);
|
this.fileSystem = this.schedulerConfDir.getFileSystem(conf);
|
||||||
this.maxVersion = conf.getInt(
|
this.maxVersion = conf.getInt(
|
||||||
YarnConfiguration.SCHEDULER_CONFIGURATION_FS_MAX_VERSION,
|
YarnConfiguration.SCHEDULER_CONFIGURATION_FS_MAX_VERSION,
|
||||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -137,6 +139,55 @@ public class TestFSSchedulerConfigurationStore {
|
||||||
compareConfig(conf, storeConf);
|
compareConfig(conf, storeConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFileSystemClose() throws Exception {
|
||||||
|
MiniDFSCluster hdfsCluster = null;
|
||||||
|
FileSystem fs = null;
|
||||||
|
Path path = new Path("/tmp/confstore");
|
||||||
|
try {
|
||||||
|
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
|
||||||
|
hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
|
||||||
|
.numDataNodes(1).build();
|
||||||
|
|
||||||
|
fs = hdfsCluster.getFileSystem();
|
||||||
|
if (!fs.exists(path)) {
|
||||||
|
fs.mkdirs(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
FSSchedulerConfigurationStore configStore =
|
||||||
|
new FSSchedulerConfigurationStore();
|
||||||
|
hdfsConfig.set(YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH,
|
||||||
|
path.toString());
|
||||||
|
configStore.initialize(hdfsConfig, hdfsConfig, null);
|
||||||
|
|
||||||
|
// Close the FileSystem object and validate
|
||||||
|
fs.close();
|
||||||
|
|
||||||
|
try {
|
||||||
|
Map<String, String> updates = new HashMap<>();
|
||||||
|
updates.put("testkey", "testvalue");
|
||||||
|
LogMutation logMutation = new LogMutation(updates, "test");
|
||||||
|
configStore.logMutation(logMutation);
|
||||||
|
configStore.confirmMutation(true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
if (e.getMessage().contains("Filesystem closed")) {
|
||||||
|
fail("FSSchedulerConfigurationStore failed to handle " +
|
||||||
|
"FileSystem close");
|
||||||
|
} else {
|
||||||
|
fail("Should not get any exceptions");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
fs = hdfsCluster.getFileSystem();
|
||||||
|
if (fs.exists(path)) {
|
||||||
|
fs.delete(path, true);
|
||||||
|
}
|
||||||
|
if (hdfsCluster != null) {
|
||||||
|
hdfsCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFormatConfiguration() throws Exception {
|
public void testFormatConfiguration() throws Exception {
|
||||||
Configuration schedulerConf = new Configuration();
|
Configuration schedulerConf = new Configuration();
|
||||||
|
|
Loading…
Reference in New Issue