YARN-8390. Fix API incompatible changes in FairScheduler's AllocationFileLoaderService. (Gergo Repas via Haibo Chen)
This commit is contained in:
parent
ab3885f2c8
commit
ba12f87dcb
|
@ -87,7 +87,7 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
private Path allocFile;
|
private Path allocFile;
|
||||||
private FileSystem fs;
|
private FileSystem fs;
|
||||||
|
|
||||||
private final Listener reloadListener;
|
private Listener reloadListener;
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS;
|
long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS;
|
||||||
|
@ -95,16 +95,15 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
private Thread reloadThread;
|
private Thread reloadThread;
|
||||||
private volatile boolean running = true;
|
private volatile boolean running = true;
|
||||||
|
|
||||||
public AllocationFileLoaderService(Listener reloadListener) {
|
public AllocationFileLoaderService() {
|
||||||
this(reloadListener, SystemClock.getInstance());
|
this(SystemClock.getInstance());
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<Permission> defaultPermissions;
|
private List<Permission> defaultPermissions;
|
||||||
|
|
||||||
public AllocationFileLoaderService(Listener reloadListener, Clock clock) {
|
public AllocationFileLoaderService(Clock clock) {
|
||||||
super(AllocationFileLoaderService.class.getName());
|
super(AllocationFileLoaderService.class.getName());
|
||||||
this.clock = clock;
|
this.clock = clock;
|
||||||
this.reloadListener = reloadListener;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -209,6 +208,10 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
return allocPath;
|
return allocPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized void setReloadListener(Listener reloadListener) {
|
||||||
|
this.reloadListener = reloadListener;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Updates the allocation list from the allocation config file. This file is
|
* Updates the allocation list from the allocation config file. This file is
|
||||||
* expected to be in the XML format specified in the design doc.
|
* expected to be in the XML format specified in the design doc.
|
||||||
|
@ -350,6 +353,7 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
public interface Listener {
|
public interface Listener {
|
||||||
void onReload(AllocationConfiguration info) throws IOException;
|
void onReload(AllocationConfiguration info) throws IOException;
|
||||||
|
|
||||||
void onCheck();
|
default void onCheck() {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -208,8 +208,7 @@ public class FairScheduler extends
|
||||||
public FairScheduler() {
|
public FairScheduler() {
|
||||||
super(FairScheduler.class.getName());
|
super(FairScheduler.class.getName());
|
||||||
context = new FSContext(this);
|
context = new FSContext(this);
|
||||||
allocsLoader =
|
allocsLoader = new AllocationFileLoaderService();
|
||||||
new AllocationFileLoaderService(new AllocationReloadListener());
|
|
||||||
queueMgr = new QueueManager(this);
|
queueMgr = new QueueManager(this);
|
||||||
maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
|
maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
|
||||||
}
|
}
|
||||||
|
@ -1438,6 +1437,7 @@ public class FairScheduler extends
|
||||||
}
|
}
|
||||||
|
|
||||||
allocsLoader.init(conf);
|
allocsLoader.init(conf);
|
||||||
|
allocsLoader.setReloadListener(new AllocationReloadListener());
|
||||||
// If we fail to load allocations file on initialize, we want to fail
|
// If we fail to load allocations file on initialize, we want to fail
|
||||||
// immediately. After a successful load, exceptions on future reloads
|
// immediately. After a successful load, exceptions on future reloads
|
||||||
// will just result in leaving things as they are.
|
// will just result in leaving things as they are.
|
||||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService.Listener;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||||
|
@ -33,8 +32,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.Fai
|
||||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.FileWriter;
|
import java.io.FileWriter;
|
||||||
|
@ -82,8 +79,7 @@ public class TestAllocationFileLoaderService {
|
||||||
fs.copyFromLocalFile(new Path(fschedURL.toURI()), new Path(fsAllocPath));
|
fs.copyFromLocalFile(new Path(fschedURL.toURI()), new Path(fsAllocPath));
|
||||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocPath);
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, fsAllocPath);
|
||||||
|
|
||||||
AllocationFileLoaderService allocLoader =
|
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||||
new AllocationFileLoaderService(Mockito.mock(Listener.class));
|
|
||||||
Path allocationFile = allocLoader.getAllocationFile(conf);
|
Path allocationFile = allocLoader.getAllocationFile(conf);
|
||||||
assertEquals(fsAllocPath, allocationFile.toString());
|
assertEquals(fsAllocPath, allocationFile.toString());
|
||||||
assertTrue(fs.exists(allocationFile));
|
assertTrue(fs.exists(allocationFile));
|
||||||
|
@ -96,8 +92,7 @@ public class TestAllocationFileLoaderService {
|
||||||
throws UnsupportedFileSystemException {
|
throws UnsupportedFileSystemException {
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, "badfs:///badfile");
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, "badfs:///badfile");
|
||||||
AllocationFileLoaderService allocLoader =
|
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||||
new AllocationFileLoaderService(Mockito.mock(Listener.class));
|
|
||||||
|
|
||||||
allocLoader.getAllocationFile(conf);
|
allocLoader.getAllocationFile(conf);
|
||||||
}
|
}
|
||||||
|
@ -110,7 +105,7 @@ public class TestAllocationFileLoaderService {
|
||||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
|
||||||
TEST_FAIRSCHED_XML);
|
TEST_FAIRSCHED_XML);
|
||||||
AllocationFileLoaderService allocLoader =
|
AllocationFileLoaderService allocLoader =
|
||||||
new AllocationFileLoaderService(Mockito.mock(Listener.class));
|
new AllocationFileLoaderService();
|
||||||
Path allocationFile = allocLoader.getAllocationFile(conf);
|
Path allocationFile = allocLoader.getAllocationFile(conf);
|
||||||
assertEquals(TEST_FAIRSCHED_XML, allocationFile.getName());
|
assertEquals(TEST_FAIRSCHED_XML, allocationFile.getName());
|
||||||
assertTrue(fs.exists(allocationFile));
|
assertTrue(fs.exists(allocationFile));
|
||||||
|
@ -139,11 +134,12 @@ public class TestAllocationFileLoaderService {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
|
||||||
ReloadListener confHolder = new ReloadListener();
|
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(
|
||||||
AllocationFileLoaderService allocLoader =
|
clock);
|
||||||
new AllocationFileLoaderService(confHolder, clock);
|
|
||||||
allocLoader.reloadIntervalMs = 5;
|
allocLoader.reloadIntervalMs = 5;
|
||||||
allocLoader.init(conf);
|
allocLoader.init(conf);
|
||||||
|
ReloadListener confHolder = new ReloadListener();
|
||||||
|
allocLoader.setReloadListener(confHolder);
|
||||||
allocLoader.reloadAllocations();
|
allocLoader.reloadAllocations();
|
||||||
AllocationConfiguration allocConf = confHolder.allocConf;
|
AllocationConfiguration allocConf = confHolder.allocConf;
|
||||||
|
|
||||||
|
@ -209,9 +205,7 @@ public class TestAllocationFileLoaderService {
|
||||||
public void testAllocationFileParsing() throws Exception {
|
public void testAllocationFileParsing() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
ReloadListener confHolder = new ReloadListener();
|
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||||
AllocationFileLoaderService allocLoader =
|
|
||||||
new AllocationFileLoaderService(confHolder);
|
|
||||||
|
|
||||||
AllocationFileWriter
|
AllocationFileWriter
|
||||||
.create()
|
.create()
|
||||||
|
@ -284,6 +278,8 @@ public class TestAllocationFileLoaderService {
|
||||||
.writeToFile(ALLOC_FILE);
|
.writeToFile(ALLOC_FILE);
|
||||||
|
|
||||||
allocLoader.init(conf);
|
allocLoader.init(conf);
|
||||||
|
ReloadListener confHolder = new ReloadListener();
|
||||||
|
allocLoader.setReloadListener(confHolder);
|
||||||
allocLoader.reloadAllocations();
|
allocLoader.reloadAllocations();
|
||||||
AllocationConfiguration queueConf = confHolder.allocConf;
|
AllocationConfiguration queueConf = confHolder.allocConf;
|
||||||
|
|
||||||
|
@ -431,9 +427,7 @@ public class TestAllocationFileLoaderService {
|
||||||
public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
|
public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
ReloadListener confHolder = new ReloadListener();
|
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||||
AllocationFileLoaderService allocLoader =
|
|
||||||
new AllocationFileLoaderService(confHolder);
|
|
||||||
|
|
||||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
out.println("<?xml version=\"1.0\"?>");
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
@ -479,6 +473,8 @@ public class TestAllocationFileLoaderService {
|
||||||
out.close();
|
out.close();
|
||||||
|
|
||||||
allocLoader.init(conf);
|
allocLoader.init(conf);
|
||||||
|
ReloadListener confHolder = new ReloadListener();
|
||||||
|
allocLoader.setReloadListener(confHolder);
|
||||||
allocLoader.reloadAllocations();
|
allocLoader.reloadAllocations();
|
||||||
AllocationConfiguration queueConf = confHolder.allocConf;
|
AllocationConfiguration queueConf = confHolder.allocConf;
|
||||||
|
|
||||||
|
@ -554,10 +550,10 @@ public class TestAllocationFileLoaderService {
|
||||||
out.println("</allocations>");
|
out.println("</allocations>");
|
||||||
out.close();
|
out.close();
|
||||||
|
|
||||||
ReloadListener confHolder = new ReloadListener();
|
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||||
AllocationFileLoaderService allocLoader =
|
|
||||||
new AllocationFileLoaderService(confHolder);
|
|
||||||
allocLoader.init(conf);
|
allocLoader.init(conf);
|
||||||
|
ReloadListener confHolder = new ReloadListener();
|
||||||
|
allocLoader.setReloadListener(confHolder);
|
||||||
allocLoader.reloadAllocations();
|
allocLoader.reloadAllocations();
|
||||||
AllocationConfiguration allocConf = confHolder.allocConf;
|
AllocationConfiguration allocConf = confHolder.allocConf;
|
||||||
|
|
||||||
|
@ -588,10 +584,10 @@ public class TestAllocationFileLoaderService {
|
||||||
out.println("</allocations>");
|
out.println("</allocations>");
|
||||||
out.close();
|
out.close();
|
||||||
|
|
||||||
ReloadListener confHolder = new ReloadListener();
|
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||||
AllocationFileLoaderService allocLoader =
|
|
||||||
new AllocationFileLoaderService(confHolder);
|
|
||||||
allocLoader.init(conf);
|
allocLoader.init(conf);
|
||||||
|
ReloadListener confHolder = new ReloadListener();
|
||||||
|
allocLoader.setReloadListener(confHolder);
|
||||||
allocLoader.reloadAllocations();
|
allocLoader.reloadAllocations();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -612,10 +608,10 @@ public class TestAllocationFileLoaderService {
|
||||||
out.println("</allocations>");
|
out.println("</allocations>");
|
||||||
out.close();
|
out.close();
|
||||||
|
|
||||||
ReloadListener confHolder = new ReloadListener();
|
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||||
AllocationFileLoaderService allocLoader =
|
|
||||||
new AllocationFileLoaderService(confHolder);
|
|
||||||
allocLoader.init(conf);
|
allocLoader.init(conf);
|
||||||
|
ReloadListener confHolder = new ReloadListener();
|
||||||
|
allocLoader.setReloadListener(confHolder);
|
||||||
allocLoader.reloadAllocations();
|
allocLoader.reloadAllocations();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -636,10 +632,10 @@ public class TestAllocationFileLoaderService {
|
||||||
out.println("</allocations>");
|
out.println("</allocations>");
|
||||||
out.close();
|
out.close();
|
||||||
|
|
||||||
ReloadListener confHolder = new ReloadListener();
|
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||||
AllocationFileLoaderService allocLoader =
|
|
||||||
new AllocationFileLoaderService(confHolder);
|
|
||||||
allocLoader.init(conf);
|
allocLoader.init(conf);
|
||||||
|
ReloadListener confHolder = new ReloadListener();
|
||||||
|
allocLoader.setReloadListener(confHolder);
|
||||||
allocLoader.reloadAllocations();
|
allocLoader.reloadAllocations();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -658,10 +654,10 @@ public class TestAllocationFileLoaderService {
|
||||||
out.println("</allocations>");
|
out.println("</allocations>");
|
||||||
out.close();
|
out.close();
|
||||||
|
|
||||||
ReloadListener confHolder = new ReloadListener();
|
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||||
AllocationFileLoaderService allocLoader =
|
|
||||||
new AllocationFileLoaderService(confHolder);
|
|
||||||
allocLoader.init(conf);
|
allocLoader.init(conf);
|
||||||
|
ReloadListener confHolder = new ReloadListener();
|
||||||
|
allocLoader.setReloadListener(confHolder);
|
||||||
try {
|
try {
|
||||||
allocLoader.reloadAllocations();
|
allocLoader.reloadAllocations();
|
||||||
} catch (AllocationConfigurationException ex) {
|
} catch (AllocationConfigurationException ex) {
|
||||||
|
@ -689,10 +685,10 @@ public class TestAllocationFileLoaderService {
|
||||||
out.println("</allocations>");
|
out.println("</allocations>");
|
||||||
out.close();
|
out.close();
|
||||||
|
|
||||||
ReloadListener confHolder = new ReloadListener();
|
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||||
AllocationFileLoaderService allocLoader =
|
|
||||||
new AllocationFileLoaderService(confHolder);
|
|
||||||
allocLoader.init(conf);
|
allocLoader.init(conf);
|
||||||
|
ReloadListener confHolder = new ReloadListener();
|
||||||
|
allocLoader.setReloadListener(confHolder);
|
||||||
try {
|
try {
|
||||||
allocLoader.reloadAllocations();
|
allocLoader.reloadAllocations();
|
||||||
} catch (AllocationConfigurationException ex) {
|
} catch (AllocationConfigurationException ex) {
|
||||||
|
@ -718,10 +714,10 @@ public class TestAllocationFileLoaderService {
|
||||||
out.println("</allocations>");
|
out.println("</allocations>");
|
||||||
out.close();
|
out.close();
|
||||||
|
|
||||||
ReloadListener confHolder = new ReloadListener();
|
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||||
AllocationFileLoaderService allocLoader =
|
|
||||||
new AllocationFileLoaderService(confHolder);
|
|
||||||
allocLoader.init(conf);
|
allocLoader.init(conf);
|
||||||
|
ReloadListener confHolder = new ReloadListener();
|
||||||
|
allocLoader.setReloadListener(confHolder);
|
||||||
allocLoader.reloadAllocations();
|
allocLoader.reloadAllocations();
|
||||||
AllocationConfiguration queueConf = confHolder.allocConf;
|
AllocationConfiguration queueConf = confHolder.allocConf;
|
||||||
// Check whether queue 'parent' and 'child' are loaded successfully
|
// Check whether queue 'parent' and 'child' are loaded successfully
|
||||||
|
@ -749,10 +745,10 @@ public class TestAllocationFileLoaderService {
|
||||||
out.println("</allocations>");
|
out.println("</allocations>");
|
||||||
out.close();
|
out.close();
|
||||||
|
|
||||||
ReloadListener confHolder = new ReloadListener();
|
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||||
AllocationFileLoaderService allocLoader =
|
|
||||||
new AllocationFileLoaderService(confHolder);
|
|
||||||
allocLoader.init(conf);
|
allocLoader.init(conf);
|
||||||
|
ReloadListener confHolder = new ReloadListener();
|
||||||
|
allocLoader.setReloadListener(confHolder);
|
||||||
allocLoader.reloadAllocations();
|
allocLoader.reloadAllocations();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -771,10 +767,10 @@ public class TestAllocationFileLoaderService {
|
||||||
out.println("</allocations>");
|
out.println("</allocations>");
|
||||||
out.close();
|
out.close();
|
||||||
|
|
||||||
ReloadListener confHolder = new ReloadListener();
|
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||||
AllocationFileLoaderService allocLoader =
|
|
||||||
new AllocationFileLoaderService(confHolder);
|
|
||||||
allocLoader.init(conf);
|
allocLoader.init(conf);
|
||||||
|
ReloadListener confHolder = new ReloadListener();
|
||||||
|
allocLoader.setReloadListener(confHolder);
|
||||||
allocLoader.reloadAllocations();
|
allocLoader.reloadAllocations();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -797,10 +793,10 @@ public class TestAllocationFileLoaderService {
|
||||||
out.println("</allocations>");
|
out.println("</allocations>");
|
||||||
out.close();
|
out.close();
|
||||||
|
|
||||||
ReloadListener confHolder = new ReloadListener();
|
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||||
AllocationFileLoaderService allocLoader =
|
|
||||||
new AllocationFileLoaderService(confHolder);
|
|
||||||
allocLoader.init(conf);
|
allocLoader.init(conf);
|
||||||
|
ReloadListener confHolder = new ReloadListener();
|
||||||
|
allocLoader.setReloadListener(confHolder);
|
||||||
allocLoader.reloadAllocations();
|
allocLoader.reloadAllocations();
|
||||||
|
|
||||||
AllocationConfiguration allocConf = confHolder.allocConf;
|
AllocationConfiguration allocConf = confHolder.allocConf;
|
||||||
|
@ -857,10 +853,10 @@ public class TestAllocationFileLoaderService {
|
||||||
out.println("</allocations>");
|
out.println("</allocations>");
|
||||||
out.close();
|
out.close();
|
||||||
|
|
||||||
ReloadListener confHolder = new ReloadListener();
|
AllocationFileLoaderService allocLoader = new AllocationFileLoaderService();
|
||||||
AllocationFileLoaderService allocLoader =
|
|
||||||
new AllocationFileLoaderService(confHolder);
|
|
||||||
allocLoader.init(conf);
|
allocLoader.init(conf);
|
||||||
|
ReloadListener confHolder = new ReloadListener();
|
||||||
|
allocLoader.setReloadListener(confHolder);
|
||||||
allocLoader.reloadAllocations();
|
allocLoader.reloadAllocations();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue