YARN-10049. FIFOOrderingPolicy Improvements. Contributed by Benjamin Teke

This commit is contained in:
Szilard Nemeth 2022-03-10 22:15:35 +01:00
parent d0fa9b5775
commit 481da19494
2 changed files with 71 additions and 25 deletions

View File

@ -29,6 +29,11 @@ public class FifoComparator
@Override @Override
public int compare(SchedulableEntity r1, SchedulableEntity r2) { public int compare(SchedulableEntity r1, SchedulableEntity r2) {
int res = r1.compareInputOrderTo(r2); int res = r1.compareInputOrderTo(r2);
if (res == 0) {
res = (int) Math.signum(r1.getStartTime() - r2.getStartTime());
}
return res; return res;
} }
} }

View File

@ -18,16 +18,19 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.records.Priority;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.apache.hadoop.yarn.api.records.Priority; public
class TestFifoOrderingPolicy {
import static org.assertj.core.api.Assertions.assertThat;
public class TestFifoOrderingPolicy {
@Test @Test
public void testFifoOrderingPolicy() { public void testFifoOrderingPolicy() {
@ -36,13 +39,17 @@ public class TestFifoOrderingPolicy {
MockSchedulableEntity r1 = new MockSchedulableEntity(); MockSchedulableEntity r1 = new MockSchedulableEntity();
MockSchedulableEntity r2 = new MockSchedulableEntity(); MockSchedulableEntity r2 = new MockSchedulableEntity();
assertThat(policy.getComparator().compare(r1, r2)).isEqualTo(0); assertEquals("The comparator should return 0 because the entities are created with " +
"the same values.", 0,
policy.getComparator().compare(r1, r2));
r1.setSerial(1); r1.setSerial(1);
assertThat(policy.getComparator().compare(r1, r2)).isEqualTo(1); assertEquals("The lhs entity has a larger serial, the comparator return " +
"value should be 1.", 1, policy.getComparator().compare(r1, r2));
r2.setSerial(2); r2.setSerial(2);
assertThat(policy.getComparator().compare(r1, r2)).isEqualTo(-1); Assert.assertEquals("The rhs entity has a larger serial, the comparator return " +
"value should be -1.", -1, policy.getComparator().compare(r1, r2));
} }
@Test @Test
@ -63,46 +70,80 @@ public class TestFifoOrderingPolicy {
schedOrder.addSchedulableEntity(msp3); schedOrder.addSchedulableEntity(msp3);
//Assignment, oldest to youngest //Assignment, oldest to youngest
checkSerials(schedOrder.getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR), new long[]{1, 2, 3}); checkSerials(Arrays.asList(1L, 2L, 3L), schedOrder.getAssignmentIterator(
IteratorSelector.EMPTY_ITERATOR_SELECTOR));
//Preemption, youngest to oldest //Preemption, youngest to oldest
checkSerials(schedOrder.getPreemptionIterator(), new long[]{3, 2, 1}); checkSerials(Arrays.asList(3L, 2L, 1L), schedOrder.getPreemptionIterator());
} }
public void checkSerials(Iterator<MockSchedulableEntity> si, public void checkSerials(List<Long> expectedSerials, Iterator<MockSchedulableEntity>
long[] serials) { actualSerialIterator) {
for (int i = 0;i < serials.length;i++) { for (long expectedSerial : expectedSerials) {
Assert.assertEquals(si.next().getSerial(), assertEquals(expectedSerial, actualSerialIterator.next().getSerial());
serials[i]);
} }
} }
@Test @Test
public void testFifoOrderingPolicyAlongWithPriorty() { public void testFifoOrderingPolicyAlongWithPriority() {
FifoOrderingPolicy<MockSchedulableEntity> policy = FifoOrderingPolicy<MockSchedulableEntity> policy =
new FifoOrderingPolicy<MockSchedulableEntity>(); new FifoOrderingPolicy<MockSchedulableEntity>();
MockSchedulableEntity r1 = new MockSchedulableEntity(); MockSchedulableEntity r1 = new MockSchedulableEntity();
MockSchedulableEntity r2 = new MockSchedulableEntity(); MockSchedulableEntity r2 = new MockSchedulableEntity();
Priority p1 = Priority.newInstance(1); assertEquals("Both r1 and r2 priority is null, the comparator should return 0.", 0,
Priority p2 = Priority.newInstance(0); policy.getComparator().compare(r1, r2));
// Both r1 and r1 priority is null Priority p2 = Priority.newInstance(0);
Assert.assertEquals(0, policy.getComparator().compare(r1, r2));
// r1 is null and r2 is not null // r1 is null and r2 is not null
r2.setApplicationPriority(p2); r2.setApplicationPriority(p2);
Assert.assertEquals(-1, policy.getComparator().compare(r1, r2)); Assert.assertTrue("The priority of r1 is null, the priority of r2 is not null, " +
"the comparator should return a negative value.",
policy.getComparator().compare(r1, r2) < 0);
Priority p1 = Priority.newInstance(1);
// r1 is not null and r2 is null // r1 is not null and r2 is null
r2.setApplicationPriority(null);
r1.setApplicationPriority(p1); r1.setApplicationPriority(p1);
Assert.assertEquals(1, policy.getComparator().compare(r1, r2)); r2.setApplicationPriority(null);
assertTrue("The priority of r1 is not null, the priority of r2 is null," +
"the comparator should return a positive value.",
policy.getComparator().compare(r1, r2) > 0);
// r1 is not null and r2 is not null // r1 is not null and r2 is not null
r1.setApplicationPriority(p1); r1.setApplicationPriority(p1);
r2.setApplicationPriority(p2); r2.setApplicationPriority(p2);
Assert.assertEquals(-1, policy.getComparator().compare(r1, r2)); Assert.assertTrue("Both priorities are not null, the r1 has higher priority, " +
"the result should be a negative value.",
policy.getComparator().compare(r1, r2) < 0);
} }
@Test
public void testOrderingUsingAppSubmitTime() {
FifoOrderingPolicy<MockSchedulableEntity> policy =
new FifoOrderingPolicy<MockSchedulableEntity>();
MockSchedulableEntity r1 = new MockSchedulableEntity();
MockSchedulableEntity r2 = new MockSchedulableEntity();
// R1, R2 has been started at same time
assertEquals(r1.getStartTime(), r2.getStartTime());
// No changes, equal
assertEquals("The submit times are the same, the comparator should return 0.", 0,
policy.getComparator().compare(r1, r2));
// R2 has been started after R1
r1.setStartTime(5);
r2.setStartTime(10);
Assert.assertTrue("r2 was started after r1, " +
"the comparator should return a negative value.",
policy.getComparator().compare(r1, r2) < 0);
// R1 has been started after R2
r1.setStartTime(10);
r2.setStartTime(5);
Assert.assertTrue("r2 was started before r1, the comparator should return a positive value.",
policy.getComparator().compare(r1, r2) > 0);
}
} }