Merge r1529005 from trunk to branch-2 for MAPREDUCE-5489. MR jobs hangs as it does not use the node-blacklisting feature in RM requests (Zhijie Shen via bikas)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1529007 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
562af72497
commit
53df9bdb14
|
@ -135,6 +135,9 @@ Release 2.1.2 - UNRELEASED
|
||||||
tests jar is breaking tests for downstream components (Robert Kanter via
|
tests jar is breaking tests for downstream components (Robert Kanter via
|
||||||
Sandy Ryza)
|
Sandy Ryza)
|
||||||
|
|
||||||
|
MAPREDUCE-5489. MR jobs hangs as it does not use the node-blacklisting
|
||||||
|
feature in RM requests (Zhijie Shen via bikas)
|
||||||
|
|
||||||
Release 2.1.1-beta - 2013-09-23
|
Release 2.1.1-beta - 2013-09-23
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
@ -86,6 +87,10 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
||||||
private final Map<String, Integer> nodeFailures = new HashMap<String, Integer>();
|
private final Map<String, Integer> nodeFailures = new HashMap<String, Integer>();
|
||||||
private final Set<String> blacklistedNodes = Collections
|
private final Set<String> blacklistedNodes = Collections
|
||||||
.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
|
.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
|
||||||
|
private final Set<String> blacklistAdditions = Collections
|
||||||
|
.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
|
||||||
|
private final Set<String> blacklistRemovals = Collections
|
||||||
|
.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
|
||||||
|
|
||||||
public RMContainerRequestor(ClientService clientService, AppContext context) {
|
public RMContainerRequestor(ClientService clientService, AppContext context) {
|
||||||
super(clientService, context);
|
super(clientService, context);
|
||||||
|
@ -145,10 +150,13 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected AllocateResponse makeRemoteRequest() throws IOException {
|
protected AllocateResponse makeRemoteRequest() throws IOException {
|
||||||
|
ResourceBlacklistRequest blacklistRequest =
|
||||||
|
ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions),
|
||||||
|
new ArrayList<String>(blacklistRemovals));
|
||||||
AllocateRequest allocateRequest =
|
AllocateRequest allocateRequest =
|
||||||
AllocateRequest.newInstance(lastResponseID,
|
AllocateRequest.newInstance(lastResponseID,
|
||||||
super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
|
super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
|
||||||
new ArrayList<ContainerId>(release), null);
|
new ArrayList<ContainerId>(release), blacklistRequest);
|
||||||
AllocateResponse allocateResponse;
|
AllocateResponse allocateResponse;
|
||||||
try {
|
try {
|
||||||
allocateResponse = scheduler.allocate(allocateRequest);
|
allocateResponse = scheduler.allocate(allocateRequest);
|
||||||
|
@ -172,6 +180,14 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
||||||
|
|
||||||
ask.clear();
|
ask.clear();
|
||||||
release.clear();
|
release.clear();
|
||||||
|
|
||||||
|
if (blacklistAdditions.size() > 0 || blacklistRemovals.size() > 0) {
|
||||||
|
LOG.info("Update the blacklist for " + applicationId +
|
||||||
|
": blacklistAdditions=" + blacklistAdditions.size() +
|
||||||
|
" blacklistRemovals=" + blacklistRemovals.size());
|
||||||
|
}
|
||||||
|
blacklistAdditions.clear();
|
||||||
|
blacklistRemovals.clear();
|
||||||
return allocateResponse;
|
return allocateResponse;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,11 +211,17 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
||||||
if (ignoreBlacklisting.compareAndSet(false, true)) {
|
if (ignoreBlacklisting.compareAndSet(false, true)) {
|
||||||
LOG.info("Ignore blacklisting set to true. Known: " + clusterNmCount
|
LOG.info("Ignore blacklisting set to true. Known: " + clusterNmCount
|
||||||
+ ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
|
+ ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
|
||||||
|
// notify RM to ignore all the blacklisted nodes
|
||||||
|
blacklistAdditions.clear();
|
||||||
|
blacklistRemovals.addAll(blacklistedNodes);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (ignoreBlacklisting.compareAndSet(true, false)) {
|
if (ignoreBlacklisting.compareAndSet(true, false)) {
|
||||||
LOG.info("Ignore blacklisting set to false. Known: " + clusterNmCount
|
LOG.info("Ignore blacklisting set to false. Known: " + clusterNmCount
|
||||||
+ ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
|
+ ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
|
||||||
|
// notify RM of all the blacklisted nodes
|
||||||
|
blacklistAdditions.addAll(blacklistedNodes);
|
||||||
|
blacklistRemovals.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -221,6 +243,9 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
||||||
LOG.info(failures + " failures on node " + hostName);
|
LOG.info(failures + " failures on node " + hostName);
|
||||||
if (failures >= maxTaskFailuresPerNode) {
|
if (failures >= maxTaskFailuresPerNode) {
|
||||||
blacklistedNodes.add(hostName);
|
blacklistedNodes.add(hostName);
|
||||||
|
if (!ignoreBlacklisting.get()) {
|
||||||
|
blacklistAdditions.add(hostName);
|
||||||
|
}
|
||||||
//Even if blacklisting is ignored, continue to remove the host from
|
//Even if blacklisting is ignored, continue to remove the host from
|
||||||
// the request table. The RM may have additional nodes it can allocate on.
|
// the request table. The RM may have additional nodes it can allocate on.
|
||||||
LOG.info("Blacklisted host " + hostName);
|
LOG.info("Blacklisted host " + hostName);
|
||||||
|
|
|
@ -880,8 +880,10 @@ public class TestRMContainerAllocator {
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
|
||||||
assigned = allocator.schedule();
|
assigned = allocator.schedule();
|
||||||
|
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
||||||
|
assertBlacklistAdditionsAndRemovals(2, 0, rm);
|
||||||
|
|
||||||
// mark h1/h2 as bad nodes
|
// mark h1/h2 as bad nodes
|
||||||
nodeManager1.nodeHeartbeat(false);
|
nodeManager1.nodeHeartbeat(false);
|
||||||
|
@ -890,12 +892,14 @@ public class TestRMContainerAllocator {
|
||||||
|
|
||||||
assigned = allocator.schedule();
|
assigned = allocator.schedule();
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
assertBlacklistAdditionsAndRemovals(0, 0, rm);
|
||||||
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
||||||
|
|
||||||
nodeManager3.nodeHeartbeat(true); // Node heartbeat
|
nodeManager3.nodeHeartbeat(true); // Node heartbeat
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
assigned = allocator.schedule();
|
assigned = allocator.schedule();
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
assertBlacklistAdditionsAndRemovals(0, 0, rm);
|
||||||
|
|
||||||
Assert.assertTrue("No of assignments must be 3", assigned.size() == 3);
|
Assert.assertTrue("No of assignments must be 3", assigned.size() == 3);
|
||||||
|
|
||||||
|
@ -948,7 +952,7 @@ public class TestRMContainerAllocator {
|
||||||
// Known=1, blacklisted=0, ignore should be false - assign first container
|
// Known=1, blacklisted=0, ignore should be false - assign first container
|
||||||
assigned =
|
assigned =
|
||||||
getContainerOnHost(jobId, 1, 1024, new String[] { "h1" },
|
getContainerOnHost(jobId, 1, 1024, new String[] { "h1" },
|
||||||
nodeManagers[0], dispatcher, allocator);
|
nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
|
||||||
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
||||||
|
|
||||||
LOG.info("Failing container _1 on H1 (Node should be blacklisted and"
|
LOG.info("Failing container _1 on H1 (Node should be blacklisted and"
|
||||||
|
@ -958,44 +962,52 @@ public class TestRMContainerAllocator {
|
||||||
allocator.sendFailure(f1);
|
allocator.sendFailure(f1);
|
||||||
|
|
||||||
// Test single node.
|
// Test single node.
|
||||||
|
// Known=1, blacklisted=1, ignore should be true - assign 0
|
||||||
|
// Because makeRemoteRequest will not be aware of it until next call
|
||||||
|
// The current call will send blacklisted node "h1" to RM
|
||||||
|
assigned =
|
||||||
|
getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
|
||||||
|
nodeManagers[0], dispatcher, allocator, 1, 0, 0, 1, rm);
|
||||||
|
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
||||||
|
|
||||||
// Known=1, blacklisted=1, ignore should be true - assign 1
|
// Known=1, blacklisted=1, ignore should be true - assign 1
|
||||||
assigned =
|
assigned =
|
||||||
getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
|
getContainerOnHost(jobId, 2, 1024, new String[] { "h1" },
|
||||||
nodeManagers[0], dispatcher, allocator);
|
nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
|
||||||
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
||||||
|
|
||||||
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
|
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
|
||||||
// Known=2, blacklisted=1, ignore should be true - assign 1 anyway.
|
// Known=2, blacklisted=1, ignore should be true - assign 1 anyway.
|
||||||
assigned =
|
assigned =
|
||||||
getContainerOnHost(jobId, 3, 1024, new String[] { "h2" },
|
getContainerOnHost(jobId, 3, 1024, new String[] { "h2" },
|
||||||
nodeManagers[1], dispatcher, allocator);
|
nodeManagers[1], dispatcher, allocator, 0, 0, 0, 0, rm);
|
||||||
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
||||||
|
|
||||||
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
|
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
|
||||||
// Known=3, blacklisted=1, ignore should be true - assign 1 anyway.
|
// Known=3, blacklisted=1, ignore should be true - assign 1 anyway.
|
||||||
assigned =
|
assigned =
|
||||||
getContainerOnHost(jobId, 4, 1024, new String[] { "h3" },
|
getContainerOnHost(jobId, 4, 1024, new String[] { "h3" },
|
||||||
nodeManagers[2], dispatcher, allocator);
|
nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm);
|
||||||
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
||||||
|
|
||||||
// Known=3, blacklisted=1, ignore should be true - assign 1
|
// Known=3, blacklisted=1, ignore should be true - assign 1
|
||||||
assigned =
|
assigned =
|
||||||
getContainerOnHost(jobId, 5, 1024, new String[] { "h1" },
|
getContainerOnHost(jobId, 5, 1024, new String[] { "h1" },
|
||||||
nodeManagers[0], dispatcher, allocator);
|
nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
|
||||||
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
||||||
|
|
||||||
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
|
nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher);
|
||||||
// Known=4, blacklisted=1, ignore should be false - assign 1 anyway
|
// Known=4, blacklisted=1, ignore should be false - assign 1 anyway
|
||||||
assigned =
|
assigned =
|
||||||
getContainerOnHost(jobId, 6, 1024, new String[] { "h4" },
|
getContainerOnHost(jobId, 6, 1024, new String[] { "h4" },
|
||||||
nodeManagers[3], dispatcher, allocator);
|
nodeManagers[3], dispatcher, allocator, 0, 0, 1, 0, rm);
|
||||||
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
||||||
|
|
||||||
// Test blacklisting re-enabled.
|
// Test blacklisting re-enabled.
|
||||||
// Known=4, blacklisted=1, ignore should be false - no assignment on h1
|
// Known=4, blacklisted=1, ignore should be false - no assignment on h1
|
||||||
assigned =
|
assigned =
|
||||||
getContainerOnHost(jobId, 7, 1024, new String[] { "h1" },
|
getContainerOnHost(jobId, 7, 1024, new String[] { "h1" },
|
||||||
nodeManagers[0], dispatcher, allocator);
|
nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
|
||||||
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
||||||
// RMContainerRequestor would have created a replacement request.
|
// RMContainerRequestor would have created a replacement request.
|
||||||
|
|
||||||
|
@ -1004,17 +1016,24 @@ public class TestRMContainerAllocator {
|
||||||
allocator.sendFailure(f2);
|
allocator.sendFailure(f2);
|
||||||
|
|
||||||
// Test ignore blacklisting re-enabled
|
// Test ignore blacklisting re-enabled
|
||||||
|
// Known=4, blacklisted=2, ignore should be true. Should assign 0
|
||||||
|
// container for the same reason above.
|
||||||
|
assigned =
|
||||||
|
getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
|
||||||
|
nodeManagers[0], dispatcher, allocator, 1, 0, 0, 2, rm);
|
||||||
|
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
||||||
|
|
||||||
// Known=4, blacklisted=2, ignore should be true. Should assign 2
|
// Known=4, blacklisted=2, ignore should be true. Should assign 2
|
||||||
// containers.
|
// containers.
|
||||||
assigned =
|
assigned =
|
||||||
getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
|
getContainerOnHost(jobId, 8, 1024, new String[] { "h1" },
|
||||||
nodeManagers[0], dispatcher, allocator);
|
nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm);
|
||||||
Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
|
Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
|
||||||
|
|
||||||
// Known=4, blacklisted=2, ignore should be true.
|
// Known=4, blacklisted=2, ignore should be true.
|
||||||
assigned =
|
assigned =
|
||||||
getContainerOnHost(jobId, 9, 1024, new String[] { "h2" },
|
getContainerOnHost(jobId, 9, 1024, new String[] { "h2" },
|
||||||
nodeManagers[1], dispatcher, allocator);
|
nodeManagers[1], dispatcher, allocator, 0, 0, 0, 0, rm);
|
||||||
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
||||||
|
|
||||||
// Test blacklist while ignore blacklisting enabled
|
// Test blacklist while ignore blacklisting enabled
|
||||||
|
@ -1025,7 +1044,7 @@ public class TestRMContainerAllocator {
|
||||||
// Known=5, blacklisted=3, ignore should be true.
|
// Known=5, blacklisted=3, ignore should be true.
|
||||||
assigned =
|
assigned =
|
||||||
getContainerOnHost(jobId, 10, 1024, new String[] { "h3" },
|
getContainerOnHost(jobId, 10, 1024, new String[] { "h3" },
|
||||||
nodeManagers[2], dispatcher, allocator);
|
nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm);
|
||||||
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
||||||
|
|
||||||
// Assign on 5 more nodes - to re-enable blacklisting
|
// Assign on 5 more nodes - to re-enable blacklisting
|
||||||
|
@ -1034,14 +1053,14 @@ public class TestRMContainerAllocator {
|
||||||
assigned =
|
assigned =
|
||||||
getContainerOnHost(jobId, 11 + i, 1024,
|
getContainerOnHost(jobId, 11 + i, 1024,
|
||||||
new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i],
|
new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i],
|
||||||
dispatcher, allocator);
|
dispatcher, allocator, 0, 0, (i == 4 ? 3 : 0), 0, rm);
|
||||||
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test h3 (blacklisted while ignoring blacklisting) is blacklisted.
|
// Test h3 (blacklisted while ignoring blacklisting) is blacklisted.
|
||||||
assigned =
|
assigned =
|
||||||
getContainerOnHost(jobId, 20, 1024, new String[] { "h3" },
|
getContainerOnHost(jobId, 20, 1024, new String[] { "h3" },
|
||||||
nodeManagers[2], dispatcher, allocator);
|
nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm);
|
||||||
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1055,7 +1074,9 @@ public class TestRMContainerAllocator {
|
||||||
private
|
private
|
||||||
List<TaskAttemptContainerAssignedEvent> getContainerOnHost(JobId jobId,
|
List<TaskAttemptContainerAssignedEvent> getContainerOnHost(JobId jobId,
|
||||||
int taskAttemptId, int memory, String[] hosts, MockNM mockNM,
|
int taskAttemptId, int memory, String[] hosts, MockNM mockNM,
|
||||||
DrainDispatcher dispatcher, MyContainerAllocator allocator)
|
DrainDispatcher dispatcher, MyContainerAllocator allocator,
|
||||||
|
int expectedAdditions1, int expectedRemovals1,
|
||||||
|
int expectedAdditions2, int expectedRemovals2, MyResourceManager rm)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
ContainerRequestEvent reqEvent =
|
ContainerRequestEvent reqEvent =
|
||||||
createReq(jobId, taskAttemptId, memory, hosts);
|
createReq(jobId, taskAttemptId, memory, hosts);
|
||||||
|
@ -1064,6 +1085,8 @@ public class TestRMContainerAllocator {
|
||||||
// Send the request to the RM
|
// Send the request to the RM
|
||||||
List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
|
List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
assertBlacklistAdditionsAndRemovals(
|
||||||
|
expectedAdditions1, expectedRemovals1, rm);
|
||||||
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
||||||
|
|
||||||
// Heartbeat from the required nodeManager
|
// Heartbeat from the required nodeManager
|
||||||
|
@ -1072,6 +1095,8 @@ public class TestRMContainerAllocator {
|
||||||
|
|
||||||
assigned = allocator.schedule();
|
assigned = allocator.schedule();
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
assertBlacklistAdditionsAndRemovals(
|
||||||
|
expectedAdditions2, expectedRemovals2, rm);
|
||||||
return assigned;
|
return assigned;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1137,6 +1162,7 @@ public class TestRMContainerAllocator {
|
||||||
LOG.info("RM Heartbeat (To process the scheduled containers)");
|
LOG.info("RM Heartbeat (To process the scheduled containers)");
|
||||||
assigned = allocator.schedule();
|
assigned = allocator.schedule();
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
assertBlacklistAdditionsAndRemovals(0, 0, rm);
|
||||||
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
|
||||||
|
|
||||||
LOG.info("Failing container _1 on H1 (should blacklist the node)");
|
LOG.info("Failing container _1 on H1 (should blacklist the node)");
|
||||||
|
@ -1153,6 +1179,7 @@ public class TestRMContainerAllocator {
|
||||||
//Update the Scheduler with the new requests.
|
//Update the Scheduler with the new requests.
|
||||||
assigned = allocator.schedule();
|
assigned = allocator.schedule();
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
assertBlacklistAdditionsAndRemovals(1, 0, rm);
|
||||||
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
||||||
|
|
||||||
// send another request with different resource and priority
|
// send another request with different resource and priority
|
||||||
|
@ -1171,6 +1198,7 @@ public class TestRMContainerAllocator {
|
||||||
LOG.info("RM Heartbeat (To process the scheduled containers)");
|
LOG.info("RM Heartbeat (To process the scheduled containers)");
|
||||||
assigned = allocator.schedule();
|
assigned = allocator.schedule();
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
assertBlacklistAdditionsAndRemovals(0, 0, rm);
|
||||||
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
||||||
|
|
||||||
//RMContainerAllocator gets assigned a p:5 on a blacklisted node.
|
//RMContainerAllocator gets assigned a p:5 on a blacklisted node.
|
||||||
|
@ -1179,6 +1207,7 @@ public class TestRMContainerAllocator {
|
||||||
LOG.info("RM Heartbeat (To process the re-scheduled containers)");
|
LOG.info("RM Heartbeat (To process the re-scheduled containers)");
|
||||||
assigned = allocator.schedule();
|
assigned = allocator.schedule();
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
assertBlacklistAdditionsAndRemovals(0, 0, rm);
|
||||||
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
|
||||||
|
|
||||||
//Hearbeat from H3 to schedule on this host.
|
//Hearbeat from H3 to schedule on this host.
|
||||||
|
@ -1188,6 +1217,7 @@ public class TestRMContainerAllocator {
|
||||||
|
|
||||||
LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)");
|
LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)");
|
||||||
assigned = allocator.schedule();
|
assigned = allocator.schedule();
|
||||||
|
assertBlacklistAdditionsAndRemovals(0, 0, rm);
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
|
||||||
// For debugging
|
// For debugging
|
||||||
|
@ -1206,6 +1236,14 @@ public class TestRMContainerAllocator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void assertBlacklistAdditionsAndRemovals(
|
||||||
|
int expectedAdditions, int expectedRemovals, MyResourceManager rm) {
|
||||||
|
Assert.assertEquals(expectedAdditions,
|
||||||
|
rm.getMyFifoScheduler().lastBlacklistAdditions.size());
|
||||||
|
Assert.assertEquals(expectedRemovals,
|
||||||
|
rm.getMyFifoScheduler().lastBlacklistRemovals.size());
|
||||||
|
}
|
||||||
|
|
||||||
private static class MyFifoScheduler extends FifoScheduler {
|
private static class MyFifoScheduler extends FifoScheduler {
|
||||||
|
|
||||||
public MyFifoScheduler(RMContext rmContext) {
|
public MyFifoScheduler(RMContext rmContext) {
|
||||||
|
@ -1220,6 +1258,8 @@ public class TestRMContainerAllocator {
|
||||||
}
|
}
|
||||||
|
|
||||||
List<ResourceRequest> lastAsk = null;
|
List<ResourceRequest> lastAsk = null;
|
||||||
|
List<String> lastBlacklistAdditions;
|
||||||
|
List<String> lastBlacklistRemovals;
|
||||||
|
|
||||||
// override this to copy the objects otherwise FifoScheduler updates the
|
// override this to copy the objects otherwise FifoScheduler updates the
|
||||||
// numContainers in same objects as kept by RMContainerAllocator
|
// numContainers in same objects as kept by RMContainerAllocator
|
||||||
|
@ -1236,6 +1276,8 @@ public class TestRMContainerAllocator {
|
||||||
askCopy.add(reqCopy);
|
askCopy.add(reqCopy);
|
||||||
}
|
}
|
||||||
lastAsk = ask;
|
lastAsk = ask;
|
||||||
|
lastBlacklistAdditions = blacklistAdditions;
|
||||||
|
lastBlacklistRemovals = blacklistRemovals;
|
||||||
return super.allocate(
|
return super.allocate(
|
||||||
applicationAttemptId, askCopy, release,
|
applicationAttemptId, askCopy, release,
|
||||||
blacklistAdditions, blacklistRemovals);
|
blacklistAdditions, blacklistRemovals);
|
||||||
|
|
Loading…
Reference in New Issue