YARN-7622. Allow fair-scheduler configuration on HDFS (gphillips via rkanter)

This commit is contained in:
Robert Kanter 2018-01-10 15:03:33 -08:00
parent 679ad2ced0
commit ff67c68d3c
2 changed files with 143 additions and 67 deletions

View File

@ -17,25 +17,16 @@
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.File; import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import com.google.common.base.CharMatcher;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
@ -46,8 +37,8 @@ import org.apache.hadoop.yarn.security.Permission;
import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType; import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -58,8 +49,17 @@ import org.w3c.dom.NodeList;
import org.w3c.dom.Text; import org.w3c.dom.Text;
import org.xml.sax.SAXException; import org.xml.sax.SAXException;
import com.google.common.base.CharMatcher; import javax.xml.parsers.DocumentBuilder;
import com.google.common.annotations.VisibleForTesting; import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Public @Public
@Unstable @Unstable
@ -79,6 +79,9 @@ public class AllocationFileLoaderService extends AbstractService {
public static final long THREAD_JOIN_TIMEOUT_MS = 1000; public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
//Permitted allocation file filesystems (case insensitive)
private static final String SUPPORTED_FS_REGEX =
"(?i)(hdfs)|(file)|(s3a)|(viewfs)";
private static final String ROOT = "root"; private static final String ROOT = "root";
private static final AccessControlList EVERYBODY_ACL = private static final AccessControlList EVERYBODY_ACL =
new AccessControlList("*"); new AccessControlList("*");
@ -87,11 +90,13 @@ public class AllocationFileLoaderService extends AbstractService {
private final Clock clock; private final Clock clock;
private long lastSuccessfulReload; // Last time we successfully reloaded queues // Last time we successfully reloaded queues
private boolean lastReloadAttemptFailed = false; private volatile long lastSuccessfulReload;
private volatile boolean lastReloadAttemptFailed = false;
// Path to XML file containing allocations. // Path to XML file containing allocations.
private File allocFile; private Path allocFile;
private volatile FileSystem fs;
private Listener reloadListener; private Listener reloadListener;
@ -110,37 +115,42 @@ public class AllocationFileLoaderService extends AbstractService {
public AllocationFileLoaderService(Clock clock) { public AllocationFileLoaderService(Clock clock) {
super(AllocationFileLoaderService.class.getName()); super(AllocationFileLoaderService.class.getName());
this.clock = clock; this.clock = clock;
} }
@Override @Override
public void serviceInit(Configuration conf) throws Exception { public void serviceInit(Configuration conf) throws Exception {
this.allocFile = getAllocationFile(conf); this.allocFile = getAllocationFile(conf);
if (allocFile != null) { if(this.allocFile != null) {
this.fs = allocFile.getFileSystem(conf);
reloadThread = new Thread() { reloadThread = new Thread() {
@Override @Override
public void run() { public void run() {
while (running) { while (running) {
long time = clock.getTime(); try {
long lastModified = allocFile.lastModified(); long time = clock.getTime();
if (lastModified > lastSuccessfulReload && long lastModified =
time > lastModified + ALLOC_RELOAD_WAIT_MS) { fs.getFileStatus(allocFile).getModificationTime();
try { if (lastModified > lastSuccessfulReload &&
reloadAllocations(); time > lastModified + ALLOC_RELOAD_WAIT_MS) {
} catch (Exception ex) { try {
reloadAllocations();
} catch (Exception ex) {
if (!lastReloadAttemptFailed) {
LOG.error("Failed to reload fair scheduler config file - " +
"will use existing allocations.", ex);
}
lastReloadAttemptFailed = true;
}
} else if (lastModified == 0L) {
if (!lastReloadAttemptFailed) { if (!lastReloadAttemptFailed) {
LOG.error("Failed to reload fair scheduler config file - " + LOG.warn("Failed to reload fair scheduler config file because"
"will use existing allocations.", ex); + " last modified returned 0. File exists: "
+ fs.exists(allocFile));
} }
lastReloadAttemptFailed = true; lastReloadAttemptFailed = true;
} }
} else if (lastModified == 0l) { } catch (IOException e) {
if (!lastReloadAttemptFailed) { LOG.info("Exception while loading allocation file: " + e);
LOG.warn("Failed to reload fair scheduler config file because" +
" last modified returned 0. File exists: "
+ allocFile.exists());
}
lastReloadAttemptFailed = true;
} }
try { try {
Thread.sleep(reloadIntervalMs); Thread.sleep(reloadIntervalMs);
@ -183,25 +193,35 @@ public class AllocationFileLoaderService extends AbstractService {
* Path to XML file containing allocations. If the * Path to XML file containing allocations. If the
* path is relative, it is searched for in the * path is relative, it is searched for in the
* classpath, but loaded like a regular File. * classpath, but loaded like a regular File.
*
* @throws UnsupportedFileSystemException if path to allocation file uses an
* unsupported filesystem
*/ */
public File getAllocationFile(Configuration conf) { public Path getAllocationFile(Configuration conf)
throws UnsupportedFileSystemException {
String allocFilePath = conf.get(FairSchedulerConfiguration.ALLOCATION_FILE, String allocFilePath = conf.get(FairSchedulerConfiguration.ALLOCATION_FILE,
FairSchedulerConfiguration.DEFAULT_ALLOCATION_FILE); FairSchedulerConfiguration.DEFAULT_ALLOCATION_FILE);
File allocFile = new File(allocFilePath); Path allocPath = new Path(allocFilePath);
if (!allocFile.isAbsolute()) { String allocPathScheme = allocPath.toUri().getScheme();
if(allocPathScheme != null && !allocPathScheme.matches(SUPPORTED_FS_REGEX)){
throw new UnsupportedFileSystemException("Allocation file "
+ allocFilePath + " uses an unsupported filesystem");
} else if (!allocPath.isAbsolute()) {
URL url = Thread.currentThread().getContextClassLoader() URL url = Thread.currentThread().getContextClassLoader()
.getResource(allocFilePath); .getResource(allocFilePath);
if (url == null) { if (url == null) {
LOG.warn(allocFilePath + " not found on the classpath."); LOG.warn(allocFilePath + " not found on the classpath.");
allocFile = null; allocPath = null;
} else if (!url.getProtocol().equalsIgnoreCase("file")) { } else if (!url.getProtocol().equalsIgnoreCase("file")) {
throw new RuntimeException("Allocation file " + url throw new RuntimeException("Allocation file " + url
+ " found on the classpath is not on the local filesystem."); + " found on the classpath is not on the local filesystem.");
} else { } else {
allocFile = new File(url.getPath()); allocPath = new Path(url.getProtocol(), null, url.getPath());
} }
} else if (allocPath.isAbsoluteAndSchemeAuthorityNull()){
allocPath = new Path("file", null, allocFilePath);
} }
return allocFile; return allocPath;
} }
public synchronized void setReloadListener(Listener reloadListener) { public synchronized void setReloadListener(Listener reloadListener) {
@ -276,7 +296,7 @@ public class AllocationFileLoaderService extends AbstractService {
DocumentBuilderFactory.newInstance(); DocumentBuilderFactory.newInstance();
docBuilderFactory.setIgnoringComments(true); docBuilderFactory.setIgnoringComments(true);
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
Document doc = builder.parse(allocFile); Document doc = builder.parse(fs.open(allocFile));
Element root = doc.getDocumentElement(); Element root = doc.getDocumentElement();
if (!"allocations".equals(root.getTagName())) if (!"allocations".equals(root.getTagName()))
throw new AllocationConfigurationException("Bad fair scheduler config " + throw new AllocationConfigurationException("Bad fair scheduler config " +

View File

@ -17,17 +17,12 @@
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.*;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
@ -38,6 +33,23 @@ import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Test; import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestAllocationFileLoaderService { public class TestAllocationFileLoaderService {
final static String TEST_DIR = new File(System.getProperty("test.build.data", final static String TEST_DIR = new File(System.getProperty("test.build.data",
@ -45,16 +57,60 @@ public class TestAllocationFileLoaderService {
final static String ALLOC_FILE = new File(TEST_DIR, final static String ALLOC_FILE = new File(TEST_DIR,
"test-queues").getAbsolutePath(); "test-queues").getAbsolutePath();
private static final String TEST_FAIRSCHED_XML = "test-fair-scheduler.xml";
@Test
public void testGetAllocationFileFromFileSystem()
throws IOException, URISyntaxException {
Configuration conf = new YarnConfiguration();
File baseDir =
new File(TEST_DIR + Path.SEPARATOR + "getAllocHDFS").getAbsoluteFile();
FileUtil.fullyDelete(baseDir);
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
MiniDFSCluster hdfsCluster = builder.build();
String fsAllocPath = "hdfs://localhost:" + hdfsCluster.getNameNodePort()
+ Path.SEPARATOR + TEST_FAIRSCHED_XML;
URL fschedURL = Thread.currentThread().getContextClassLoader()
.getResource(TEST_FAIRSCHED_XML);
FileSystem fs = FileSystem.get(conf);
fs.copyFromLocalFile(new Path(fschedURL.toURI()), new Path(fsAllocPath));
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocPath);
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
Path allocationFile = allocLoader.getAllocationFile(conf);
assertEquals(fsAllocPath, allocationFile.toString());
assertTrue(fs.exists(allocationFile));
hdfsCluster.shutdown(true);
}
@Test (expected = UnsupportedFileSystemException.class)
public void testDenyGetAllocationFileFromUnsupportedFileSystem()
throws UnsupportedFileSystemException {
Configuration conf = new YarnConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, "badfs:///badfile");
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
allocLoader.getAllocationFile(conf);
}
@Test @Test
public void testGetAllocationFileFromClasspath() { public void testGetAllocationFileFromClasspath() {
Configuration conf = new Configuration(); try {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, Configuration conf = new Configuration();
"test-fair-scheduler.xml"); FileSystem fs = FileSystem.get(conf);
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
File allocationFile = allocLoader.getAllocationFile(conf); TEST_FAIRSCHED_XML);
assertEquals("test-fair-scheduler.xml", allocationFile.getName()); AllocationFileLoaderService allocLoader =
assertTrue(allocationFile.exists()); new AllocationFileLoaderService();
Path allocationFile = allocLoader.getAllocationFile(conf);
assertEquals(TEST_FAIRSCHED_XML, allocationFile.getName());
assertTrue(fs.exists(allocationFile));
} catch (IOException e) {
fail("Unable to access allocation file from classpath: " + e);
}
} }
@Test (timeout = 10000) @Test (timeout = 10000)