YARN-771. AMRMClient support for resource blacklisting (Junping Du via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1519107 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-08-30 22:58:41 +00:00
parent 28ac26bc43
commit bafd302208
5 changed files with 210 additions and 5 deletions

View File

@ -69,6 +69,9 @@ Release 2.1.1-beta - UNRELEASED
YARN-1080. Improved help message for "yarn logs" command. (Xuan Gong via
vinodkv)
YARN-771. AMRMClient support for resource blacklisting (Junping Du via
bikas)
OPTIMIZATIONS
BUG FIXES

View File

@ -286,4 +286,15 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
Priority priority,
String resourceName,
Resource capability);
/**
* Update application's blacklist with addition or removal resources.
*
* @param blacklistAdditions list of resources which should be added to the
* application blacklist
* @param blacklistRemovals list of resources which should be removed from the
* application blacklist
*/
public abstract void updateBlacklist(List<String> blacklistAdditions,
List<String> blacklistRemovals);
}

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
@ -80,6 +81,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
protected Resource clusterAvailableResources;
protected int clusterNodeCount;
protected final Set<String> blacklistAdditions = new HashSet<String>();
protected final Set<String> blacklistRemovals = new HashSet<String>();
class ResourceRequestInfo {
ResourceRequest remoteRequest;
LinkedHashSet<T> containerRequests;
@ -199,9 +203,11 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
Preconditions.checkArgument(progressIndicator >= 0,
"Progress indicator should not be negative");
AllocateResponse allocateResponse = null;
ArrayList<ResourceRequest> askList = null;
ArrayList<ContainerId> releaseList = null;
List<ResourceRequest> askList = null;
List<ContainerId> releaseList = null;
AllocateRequest allocateRequest = null;
List<String> blacklistToAdd = new ArrayList<String>();
List<String> blacklistToRemove = new ArrayList<String>();
try {
synchronized (this) {
@ -217,9 +223,22 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
// optimistically clear this collection assuming no RPC failure
ask.clear();
release.clear();
blacklistToAdd.addAll(blacklistAdditions);
blacklistToRemove.addAll(blacklistRemovals);
ResourceBlacklistRequest blacklistRequest =
(blacklistToAdd != null) || (blacklistToRemove != null) ?
ResourceBlacklistRequest.newInstance(blacklistToAdd,
blacklistToRemove) : null;
allocateRequest =
AllocateRequest.newInstance(lastResponseId, progressIndicator,
askList, releaseList, null);
askList, releaseList, blacklistRequest);
// clear blacklistAdditions and blacklistRemovals before
// unsynchronized part
blacklistAdditions.clear();
blacklistRemovals.clear();
}
allocateResponse = rmClient.allocate(allocateRequest);
@ -253,6 +272,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
ask.add(oldAsk);
}
}
blacklistAdditions.addAll(blacklistToAdd);
blacklistRemovals.addAll(blacklistToRemove);
}
}
}
@ -604,4 +626,31 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
+ " #asks=" + ask.size());
}
}
@Override
public synchronized void updateBlacklist(List<String> blacklistAdditions,
List<String> blacklistRemovals) {
if (blacklistAdditions != null) {
this.blacklistAdditions.addAll(blacklistAdditions);
// if some resources are also in blacklistRemovals updated before, we
// should remove them here.
this.blacklistRemovals.removeAll(blacklistAdditions);
}
if (blacklistRemovals != null) {
this.blacklistRemovals.addAll(blacklistRemovals);
// if some resources are in blacklistAdditions before, we should remove
// them here.
this.blacklistAdditions.removeAll(blacklistRemovals);
}
if (blacklistAdditions != null && blacklistRemovals != null
&& blacklistAdditions.removeAll(blacklistRemovals)) {
// we allow resources to appear in addition list and removal list in the
// same invocation of updateBlacklist(), but should get a warn here.
LOG.warn("The same resources appear in both blacklistAdditions and " +
"blacklistRemovals in updateBlacklist.");
}
}
}

View File

@ -18,13 +18,16 @@
package org.apache.hadoop.yarn.client.api.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -97,6 +100,7 @@ public class TestAMRMClient {
static String rack;
static String[] nodes;
static String[] racks;
private final static int DEFAULT_ITERATION = 3;
@BeforeClass
public static void setup() throws Exception {
@ -476,6 +480,144 @@ public class TestAMRMClient {
}
}
}
@Test (timeout=60000)
public void testAllocationWithBlacklist() throws YarnException, IOException {
AMRMClientImpl<ContainerRequest> amClient = null;
try {
// start am rm client
amClient =
(AMRMClientImpl<ContainerRequest>) AMRMClient
.<ContainerRequest> createAMRMClient();
amClient.init(conf);
amClient.start();
amClient.registerApplicationMaster("Host", 10000, "");
assertTrue(amClient.ask.size() == 0);
assertTrue(amClient.release.size() == 0);
ContainerRequest storedContainer1 =
new ContainerRequest(capability, nodes, racks, priority);
amClient.addContainerRequest(storedContainer1);
assertTrue(amClient.ask.size() == 3);
assertTrue(amClient.release.size() == 0);
List<String> localNodeBlacklist = new ArrayList<String>();
localNodeBlacklist.add(node);
// put node in black list, so no container assignment
amClient.updateBlacklist(localNodeBlacklist, null);
int allocatedContainerCount = getAllocatedContainersNumber(amClient,
DEFAULT_ITERATION);
// the only node is in blacklist, so no allocation
assertTrue(allocatedContainerCount == 0);
// Remove node from blacklist, so get assigned with 2
amClient.updateBlacklist(null, localNodeBlacklist);
ContainerRequest storedContainer2 =
new ContainerRequest(capability, nodes, racks, priority);
amClient.addContainerRequest(storedContainer2);
allocatedContainerCount = getAllocatedContainersNumber(amClient,
DEFAULT_ITERATION);
assertEquals(allocatedContainerCount, 2);
// Test in case exception in allocate(), blacklist is kept
assertTrue(amClient.blacklistAdditions.isEmpty());
assertTrue(amClient.blacklistRemovals.isEmpty());
// create a invalid ContainerRequest - memory value is minus
ContainerRequest invalidContainerRequest =
new ContainerRequest(Resource.newInstance(-1024, 1),
nodes, racks, priority);
amClient.addContainerRequest(invalidContainerRequest);
amClient.updateBlacklist(localNodeBlacklist, null);
try {
// allocate() should complain as ContainerRequest is invalid.
amClient.allocate(0.1f);
fail("there should be an exception here.");
} catch (Exception e) {
assertEquals(amClient.blacklistAdditions.size(), 1);
}
} finally {
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
amClient.stop();
}
}
}
@Test (timeout=60000)
public void testAMRMClientWithBlacklist() throws YarnException, IOException {
AMRMClientImpl<ContainerRequest> amClient = null;
try {
// start am rm client
amClient =
(AMRMClientImpl<ContainerRequest>) AMRMClient
.<ContainerRequest> createAMRMClient();
amClient.init(conf);
amClient.start();
amClient.registerApplicationMaster("Host", 10000, "");
String[] nodes = {"node1", "node2", "node3"};
// Add nodes[0] and nodes[1]
List<String> nodeList01 = new ArrayList<String>();
nodeList01.add(nodes[0]);
nodeList01.add(nodes[1]);
amClient.updateBlacklist(nodeList01, null);
assertEquals(amClient.blacklistAdditions.size(),2);
assertEquals(amClient.blacklistRemovals.size(),0);
// Add nodes[0] again, verify it is not added duplicated.
List<String> nodeList02 = new ArrayList<String>();
nodeList02.add(nodes[0]);
nodeList02.add(nodes[2]);
amClient.updateBlacklist(nodeList02, null);
assertEquals(amClient.blacklistAdditions.size(),3);
assertEquals(amClient.blacklistRemovals.size(),0);
// Add nodes[1] and nodes[2] to removal list,
// Verify addition list remove these two nodes.
List<String> nodeList12 = new ArrayList<String>();
nodeList12.add(nodes[1]);
nodeList12.add(nodes[2]);
amClient.updateBlacklist(null, nodeList12);
assertEquals(amClient.blacklistAdditions.size(),1);
assertEquals(amClient.blacklistRemovals.size(),2);
// Add nodes[1] again to addition list,
// Verify removal list will remove this node.
List<String> nodeList1 = new ArrayList<String>();
nodeList1.add(nodes[1]);
amClient.updateBlacklist(nodeList1, null);
assertEquals(amClient.blacklistAdditions.size(),2);
assertEquals(amClient.blacklistRemovals.size(),1);
} finally {
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
amClient.stop();
}
}
}
private int getAllocatedContainersNumber(
AMRMClientImpl<ContainerRequest> amClient, int iterationsLeft)
throws YarnException, IOException {
int allocatedContainerCount = 0;
while (iterationsLeft-- > 0) {
Log.info(" == alloc " + allocatedContainerCount + " it left " + iterationsLeft);
AllocateResponse allocResponse = amClient.allocate(0.1f);
assertTrue(amClient.ask.size() == 0);
assertTrue(amClient.release.size() == 0);
assertTrue(nodeCount == amClient.getClusterNodeCount());
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
if(allocatedContainerCount == 0) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(100);
}
}
return allocatedContainerCount;
}
@Test (timeout=60000)
public void testAMRMClient() throws YarnException, IOException {

View File

@ -125,7 +125,7 @@ public class ResourceBlacklistRequestPBImpl extends ResourceBlacklistRequest {
@Override
public void setBlacklistAdditions(List<String> resourceNames) {
if (resourceNames == null) {
if (resourceNames == null || resourceNames.isEmpty()) {
if (this.blacklistAdditions != null) {
this.blacklistAdditions.clear();
}
@ -144,7 +144,7 @@ public class ResourceBlacklistRequestPBImpl extends ResourceBlacklistRequest {
@Override
public void setBlacklistRemovals(List<String> resourceNames) {
if (resourceNames == null) {
if (resourceNames == null || resourceNames.isEmpty()) {
if (this.blacklistRemovals != null) {
this.blacklistRemovals.clear();
}