YARN-7622. Allow fair-scheduler configuration on HDFS (gphillips via rkanter)
This commit is contained in:
parent
3ba985997d
commit
7a55044803
|
@ -17,25 +17,15 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
import javax.xml.parsers.ParserConfigurationException;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
|
@ -45,8 +35,8 @@ import org.apache.hadoop.yarn.security.AccessType;
|
|||
import org.apache.hadoop.yarn.security.Permission;
|
||||
import org.apache.hadoop.yarn.security.PrivilegedEntity;
|
||||
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
@ -57,7 +47,17 @@ import org.w3c.dom.NodeList;
|
|||
import org.w3c.dom.Text;
|
||||
import org.xml.sax.SAXException;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
import javax.xml.parsers.ParserConfigurationException;
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
|
@ -77,6 +77,9 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
|
||||
public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
|
||||
|
||||
//Permitted allocation file filesystems (case insensitive)
|
||||
private static final String SUPPORTED_FS_REGEX =
|
||||
"(?i)(hdfs)|(file)|(s3a)|(viewfs)";
|
||||
private static final String ROOT = "root";
|
||||
private static final AccessControlList EVERYBODY_ACL =
|
||||
new AccessControlList("*");
|
||||
|
@ -85,12 +88,14 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
|
||||
private final Clock clock;
|
||||
|
||||
private long lastSuccessfulReload; // Last time we successfully reloaded queues
|
||||
private boolean lastReloadAttemptFailed = false;
|
||||
|
||||
// Path to XML file containing allocations.
|
||||
private File allocFile;
|
||||
|
||||
// Last time we successfully reloaded queues
|
||||
private volatile long lastSuccessfulReload;
|
||||
private volatile boolean lastReloadAttemptFailed = false;
|
||||
|
||||
// Path to XML file containing allocations.
|
||||
private Path allocFile;
|
||||
private FileSystem fs;
|
||||
|
||||
private Listener reloadListener;
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -108,19 +113,19 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
public AllocationFileLoaderService(Clock clock) {
|
||||
super(AllocationFileLoaderService.class.getName());
|
||||
this.clock = clock;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) throws Exception {
|
||||
this.allocFile = getAllocationFile(conf);
|
||||
if (allocFile != null) {
|
||||
reloadThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
while (running) {
|
||||
if(this.allocFile != null) {
|
||||
this.fs = allocFile.getFileSystem(conf);
|
||||
reloadThread = new Thread(() -> {
|
||||
while (running) {
|
||||
try {
|
||||
long time = clock.getTime();
|
||||
long lastModified = allocFile.lastModified();
|
||||
long lastModified =
|
||||
fs.getFileStatus(allocFile).getModificationTime();
|
||||
if (lastModified > lastSuccessfulReload &&
|
||||
time > lastModified + ALLOC_RELOAD_WAIT_MS) {
|
||||
try {
|
||||
|
@ -136,19 +141,21 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
if (!lastReloadAttemptFailed) {
|
||||
LOG.warn("Failed to reload fair scheduler config file because" +
|
||||
" last modified returned 0. File exists: "
|
||||
+ allocFile.exists());
|
||||
+ fs.exists(allocFile));
|
||||
}
|
||||
lastReloadAttemptFailed = true;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(reloadIntervalMs);
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.info(
|
||||
"Interrupted while waiting to reload alloc configuration");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.info("Exception while loading allocation file: " + e);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(reloadIntervalMs);
|
||||
} catch (InterruptedException ex) {
|
||||
LOG.info(
|
||||
"Interrupted while waiting to reload alloc configuration");
|
||||
}
|
||||
}
|
||||
};
|
||||
});
|
||||
reloadThread.setName("AllocationFileReloader");
|
||||
reloadThread.setDaemon(true);
|
||||
}
|
||||
|
@ -182,24 +189,31 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
* path is relative, it is searched for in the
|
||||
* classpath, but loaded like a regular File.
|
||||
*/
|
||||
public File getAllocationFile(Configuration conf) {
|
||||
public Path getAllocationFile(Configuration conf)
|
||||
throws UnsupportedFileSystemException {
|
||||
String allocFilePath = conf.get(FairSchedulerConfiguration.ALLOCATION_FILE,
|
||||
FairSchedulerConfiguration.DEFAULT_ALLOCATION_FILE);
|
||||
File allocFile = new File(allocFilePath);
|
||||
if (!allocFile.isAbsolute()) {
|
||||
Path allocPath = new Path(allocFilePath);
|
||||
String allocPathScheme = allocPath.toUri().getScheme();
|
||||
if(allocPathScheme != null && !allocPathScheme.matches(SUPPORTED_FS_REGEX)){
|
||||
throw new UnsupportedFileSystemException("Allocation file "
|
||||
+ allocFilePath + " uses an unsupported filesystem");
|
||||
} else if (!allocPath.isAbsolute()) {
|
||||
URL url = Thread.currentThread().getContextClassLoader()
|
||||
.getResource(allocFilePath);
|
||||
if (url == null) {
|
||||
LOG.warn(allocFilePath + " not found on the classpath.");
|
||||
allocFile = null;
|
||||
allocPath = null;
|
||||
} else if (!url.getProtocol().equalsIgnoreCase("file")) {
|
||||
throw new RuntimeException("Allocation file " + url
|
||||
+ " found on the classpath is not on the local filesystem.");
|
||||
} else {
|
||||
allocFile = new File(url.getPath());
|
||||
allocPath = new Path(url.getProtocol(), null, url.getPath());
|
||||
}
|
||||
} else if (allocPath.isAbsoluteAndSchemeAuthorityNull()){
|
||||
allocPath = new Path("file", null, allocFilePath);
|
||||
}
|
||||
return allocFile;
|
||||
return allocPath;
|
||||
}
|
||||
|
||||
public synchronized void setReloadListener(Listener reloadListener) {
|
||||
|
@ -274,7 +288,7 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
DocumentBuilderFactory.newInstance();
|
||||
docBuilderFactory.setIgnoringComments(true);
|
||||
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
|
||||
Document doc = builder.parse(allocFile);
|
||||
Document doc = builder.parse(fs.open(allocFile));
|
||||
Element root = doc.getDocumentElement();
|
||||
if (!"allocations".equals(root.getTagName()))
|
||||
throw new AllocationConfigurationException("Bad fair scheduler config " +
|
||||
|
@ -437,7 +451,7 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
|
||||
reservationAcls, newPlacementPolicy, configuredQueues,
|
||||
globalReservationQueueConfig, reservableQueues, nonPreemptableQueues);
|
||||
|
||||
|
||||
lastSuccessfulReload = clock.getTime();
|
||||
lastReloadAttemptFailed = false;
|
||||
|
||||
|
|
|
@ -17,17 +17,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.FileWriter;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.PrintWriter;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
||||
|
@ -38,6 +33,23 @@ import org.apache.hadoop.yarn.util.ControlledClock;
|
|||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class TestAllocationFileLoaderService {
|
||||
|
||||
final static String TEST_DIR = new File(System.getProperty("test.build.data",
|
||||
|
@ -45,16 +57,60 @@ public class TestAllocationFileLoaderService {
|
|||
|
||||
final static String ALLOC_FILE = new File(TEST_DIR,
|
||||
"test-queues").getAbsolutePath();
|
||||
|
||||
private static final String TEST_FAIRSCHED_XML = "test-fair-scheduler.xml";
|
||||
|
||||
@Test
|
||||
public void testGetAllocationFileFromFileSystem()
|
||||
throws IOException, URISyntaxException {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
File baseDir =
|
||||
new File(TEST_DIR + Path.SEPARATOR + "getAllocHDFS").getAbsoluteFile();
|
||||
FileUtil.fullyDelete(baseDir);
|
||||
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
|
||||
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
|
||||
MiniDFSCluster hdfsCluster = builder.build();
|
||||
String fsAllocPath = "hdfs://localhost:" + hdfsCluster.getNameNodePort()
|
||||
+ Path.SEPARATOR + TEST_FAIRSCHED_XML;
|
||||
|
||||
URL fschedURL = Thread.currentThread().getContextClassLoader()
|
||||
.getResource(TEST_FAIRSCHED_XML);
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
fs.copyFromLocalFile(new Path(fschedURL.toURI()), new Path(fsAllocPath));
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocPath);
|
||||
|
||||
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||
Path allocationFile = allocLoader.getAllocationFile(conf);
|
||||
assertEquals(fsAllocPath, allocationFile.toString());
|
||||
assertTrue(fs.exists(allocationFile));
|
||||
|
||||
hdfsCluster.shutdown(true);
|
||||
}
|
||||
|
||||
@Test (expected = UnsupportedFileSystemException.class)
|
||||
public void testDenyGetAllocationFileFromUnsupportedFileSystem()
|
||||
throws UnsupportedFileSystemException {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, "badfs:///badfile");
|
||||
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||
|
||||
allocLoader.getAllocationFile(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAllocationFileFromClasspath() {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
|
||||
"test-fair-scheduler.xml");
|
||||
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||
File allocationFile = allocLoader.getAllocationFile(conf);
|
||||
assertEquals("test-fair-scheduler.xml", allocationFile.getName());
|
||||
assertTrue(allocationFile.exists());
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
|
||||
TEST_FAIRSCHED_XML);
|
||||
AllocationFileLoaderService allocLoader =
|
||||
new AllocationFileLoaderService();
|
||||
Path allocationFile = allocLoader.getAllocationFile(conf);
|
||||
assertEquals(TEST_FAIRSCHED_XML, allocationFile.getName());
|
||||
assertTrue(fs.exists(allocationFile));
|
||||
} catch (IOException e) {
|
||||
fail("Unable to access allocation file from classpath: " + e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout = 10000)
|
||||
|
|
Loading…
Reference in New Issue