From ff67c68d3c29e29ace96da09f18e18eba11111f1 Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Wed, 10 Jan 2018 15:03:33 -0800 Subject: [PATCH] YARN-7622. Allow fair-scheduler configuration on HDFS (gphillips via rkanter) --- .../fair/AllocationFileLoaderService.java | 118 ++++++++++-------- .../fair/TestAllocationFileLoaderService.java | 92 +++++++++++--- 2 files changed, 143 insertions(+), 67 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 3f409e4a854..d64ee55ad24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -17,25 +17,16 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import java.io.File; -import java.io.IOException; -import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.DocumentBuilderFactory; -import javax.xml.parsers.ParserConfigurationException; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.CharMatcher; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -46,8 +37,8 @@ import org.apache.hadoop.yarn.security.Permission; import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; @@ -58,8 +49,17 @@ import org.w3c.dom.NodeList; import org.w3c.dom.Text; import org.xml.sax.SAXException; -import com.google.common.base.CharMatcher; -import com.google.common.annotations.VisibleForTesting; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; @Public @Unstable @@ -79,6 +79,9 @@ public class AllocationFileLoaderService extends AbstractService { public static final long THREAD_JOIN_TIMEOUT_MS = 1000; + //Permitted allocation file filesystems (case insensitive) + private static final String SUPPORTED_FS_REGEX = + "(?i)(hdfs)|(file)|(s3a)|(viewfs)"; private static final String ROOT = "root"; private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*"); @@ -87,12 +90,14 @@ public class AllocationFileLoaderService extends AbstractService { private final Clock clock; - private long lastSuccessfulReload; // Last time we successfully reloaded queues - private boolean lastReloadAttemptFailed = false; - - // Path to XML file containing allocations. - private File allocFile; - + // Last time we successfully reloaded queues + private volatile long lastSuccessfulReload; + private volatile boolean lastReloadAttemptFailed = false; + + // Path to XML file containing allocations. + private Path allocFile; + private volatile FileSystem fs; + private Listener reloadListener; @VisibleForTesting @@ -110,37 +115,42 @@ public class AllocationFileLoaderService extends AbstractService { public AllocationFileLoaderService(Clock clock) { super(AllocationFileLoaderService.class.getName()); this.clock = clock; - } @Override public void serviceInit(Configuration conf) throws Exception { this.allocFile = getAllocationFile(conf); - if (allocFile != null) { + if(this.allocFile != null) { + this.fs = allocFile.getFileSystem(conf); reloadThread = new Thread() { @Override public void run() { while (running) { - long time = clock.getTime(); - long lastModified = allocFile.lastModified(); - if (lastModified > lastSuccessfulReload && - time > lastModified + ALLOC_RELOAD_WAIT_MS) { - try { - reloadAllocations(); - } catch (Exception ex) { + try { + long time = clock.getTime(); + long lastModified = + fs.getFileStatus(allocFile).getModificationTime(); + if (lastModified > lastSuccessfulReload && + time > lastModified + ALLOC_RELOAD_WAIT_MS) { + try { + reloadAllocations(); + } catch (Exception ex) { + if (!lastReloadAttemptFailed) { + LOG.error("Failed to reload fair scheduler config file - " + + "will use existing allocations.", ex); + } + lastReloadAttemptFailed = true; + } + } else if (lastModified == 0L) { if (!lastReloadAttemptFailed) { - LOG.error("Failed to reload fair scheduler config file - " + - "will use existing allocations.", ex); + LOG.warn("Failed to reload fair scheduler config file because" + + " last modified returned 0. File exists: " + + fs.exists(allocFile)); } lastReloadAttemptFailed = true; } - } else if (lastModified == 0l) { - if (!lastReloadAttemptFailed) { - LOG.warn("Failed to reload fair scheduler config file because" + - " last modified returned 0. File exists: " - + allocFile.exists()); - } - lastReloadAttemptFailed = true; + } catch (IOException e) { + LOG.info("Exception while loading allocation file: " + e); } try { Thread.sleep(reloadIntervalMs); @@ -183,25 +193,35 @@ public class AllocationFileLoaderService extends AbstractService { * Path to XML file containing allocations. If the * path is relative, it is searched for in the * classpath, but loaded like a regular File. + * + * @throws UnsupportedFileSystemException if path to allocation file uses an + * unsupported filesystem */ - public File getAllocationFile(Configuration conf) { + public Path getAllocationFile(Configuration conf) + throws UnsupportedFileSystemException { String allocFilePath = conf.get(FairSchedulerConfiguration.ALLOCATION_FILE, FairSchedulerConfiguration.DEFAULT_ALLOCATION_FILE); - File allocFile = new File(allocFilePath); - if (!allocFile.isAbsolute()) { + Path allocPath = new Path(allocFilePath); + String allocPathScheme = allocPath.toUri().getScheme(); + if(allocPathScheme != null && !allocPathScheme.matches(SUPPORTED_FS_REGEX)){ + throw new UnsupportedFileSystemException("Allocation file " + + allocFilePath + " uses an unsupported filesystem"); + } else if (!allocPath.isAbsolute()) { URL url = Thread.currentThread().getContextClassLoader() .getResource(allocFilePath); if (url == null) { LOG.warn(allocFilePath + " not found on the classpath."); - allocFile = null; + allocPath = null; } else if (!url.getProtocol().equalsIgnoreCase("file")) { throw new RuntimeException("Allocation file " + url + " found on the classpath is not on the local filesystem."); } else { - allocFile = new File(url.getPath()); + allocPath = new Path(url.getProtocol(), null, url.getPath()); } + } else if (allocPath.isAbsoluteAndSchemeAuthorityNull()){ + allocPath = new Path("file", null, allocFilePath); } - return allocFile; + return allocPath; } public synchronized void setReloadListener(Listener reloadListener) { @@ -276,7 +296,7 @@ public class AllocationFileLoaderService extends AbstractService { DocumentBuilderFactory.newInstance(); docBuilderFactory.setIgnoringComments(true); DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); - Document doc = builder.parse(allocFile); + Document doc = builder.parse(fs.open(allocFile)); Element root = doc.getDocumentElement(); if (!"allocations".equals(root.getTagName())) throw new AllocationConfigurationException("Bad fair scheduler config " + @@ -439,7 +459,7 @@ public class AllocationFileLoaderService extends AbstractService { fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, reservationAcls, newPlacementPolicy, configuredQueues, globalReservationQueueConfig, reservableQueues, nonPreemptableQueues); - + lastSuccessfulReload = clock.getTime(); lastReloadAttemptFailed = false; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 67b46f99398..c46ecd97896 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -17,17 +17,12 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import static org.junit.Assert.*; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.FileWriter; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.nio.charset.StandardCharsets; -import java.util.List; - import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; @@ -38,6 +33,23 @@ import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Test; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class TestAllocationFileLoaderService { final static String TEST_DIR = new File(System.getProperty("test.build.data", @@ -45,16 +57,60 @@ public class TestAllocationFileLoaderService { final static String ALLOC_FILE = new File(TEST_DIR, "test-queues").getAbsolutePath(); - + private static final String TEST_FAIRSCHED_XML = "test-fair-scheduler.xml"; + + @Test + public void testGetAllocationFileFromFileSystem() + throws IOException, URISyntaxException { + Configuration conf = new YarnConfiguration(); + File baseDir = + new File(TEST_DIR + Path.SEPARATOR + "getAllocHDFS").getAbsoluteFile(); + FileUtil.fullyDelete(baseDir); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); + MiniDFSCluster hdfsCluster = builder.build(); + String fsAllocPath = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + + Path.SEPARATOR + TEST_FAIRSCHED_XML; + + URL fschedURL = Thread.currentThread().getContextClassLoader() + .getResource(TEST_FAIRSCHED_XML); + FileSystem fs = FileSystem.get(conf); + fs.copyFromLocalFile(new Path(fschedURL.toURI()), new Path(fsAllocPath)); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocPath); + + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); + Path allocationFile = allocLoader.getAllocationFile(conf); + assertEquals(fsAllocPath, allocationFile.toString()); + assertTrue(fs.exists(allocationFile)); + + hdfsCluster.shutdown(true); + } + + @Test (expected = UnsupportedFileSystemException.class) + public void testDenyGetAllocationFileFromUnsupportedFileSystem() + throws UnsupportedFileSystemException { + Configuration conf = new YarnConfiguration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, "badfs:///badfile"); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); + + allocLoader.getAllocationFile(conf); + } + @Test public void testGetAllocationFileFromClasspath() { - Configuration conf = new Configuration(); - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, - "test-fair-scheduler.xml"); - AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); - File allocationFile = allocLoader.getAllocationFile(conf); - assertEquals("test-fair-scheduler.xml", allocationFile.getName()); - assertTrue(allocationFile.exists()); + try { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, + TEST_FAIRSCHED_XML); + AllocationFileLoaderService allocLoader = + new AllocationFileLoaderService(); + Path allocationFile = allocLoader.getAllocationFile(conf); + assertEquals(TEST_FAIRSCHED_XML, allocationFile.getName()); + assertTrue(fs.exists(allocationFile)); + } catch (IOException e) { + fail("Unable to access allocation file from classpath: " + e); + } } @Test (timeout = 10000)