Merge 1519107. from trunk to branch-2 for YARN-771. AMRMClient support for resource blacklisting (Junping Du via bikas)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1519112 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2982a8ee13
commit
148db9d4f5
|
@ -54,6 +54,9 @@ Release 2.1.1-beta - UNRELEASED
|
||||||
YARN-1080. Improved help message for "yarn logs" command. (Xuan Gong via
|
YARN-1080. Improved help message for "yarn logs" command. (Xuan Gong via
|
||||||
vinodkv)
|
vinodkv)
|
||||||
|
|
||||||
|
YARN-771. AMRMClient support for resource blacklisting (Junping Du via
|
||||||
|
bikas)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -286,4 +286,15 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
||||||
Priority priority,
|
Priority priority,
|
||||||
String resourceName,
|
String resourceName,
|
||||||
Resource capability);
|
Resource capability);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update application's blacklist with addition or removal resources.
|
||||||
|
*
|
||||||
|
* @param blacklistAdditions list of resources which should be added to the
|
||||||
|
* application blacklist
|
||||||
|
* @param blacklistRemovals list of resources which should be removed from the
|
||||||
|
* application blacklist
|
||||||
|
*/
|
||||||
|
public abstract void updateBlacklist(List<String> blacklistAdditions,
|
||||||
|
List<String> blacklistRemovals);
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||||
|
@ -80,6 +81,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
||||||
protected Resource clusterAvailableResources;
|
protected Resource clusterAvailableResources;
|
||||||
protected int clusterNodeCount;
|
protected int clusterNodeCount;
|
||||||
|
|
||||||
|
protected final Set<String> blacklistAdditions = new HashSet<String>();
|
||||||
|
protected final Set<String> blacklistRemovals = new HashSet<String>();
|
||||||
|
|
||||||
class ResourceRequestInfo {
|
class ResourceRequestInfo {
|
||||||
ResourceRequest remoteRequest;
|
ResourceRequest remoteRequest;
|
||||||
LinkedHashSet<T> containerRequests;
|
LinkedHashSet<T> containerRequests;
|
||||||
|
@ -199,9 +203,11 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
||||||
Preconditions.checkArgument(progressIndicator >= 0,
|
Preconditions.checkArgument(progressIndicator >= 0,
|
||||||
"Progress indicator should not be negative");
|
"Progress indicator should not be negative");
|
||||||
AllocateResponse allocateResponse = null;
|
AllocateResponse allocateResponse = null;
|
||||||
ArrayList<ResourceRequest> askList = null;
|
List<ResourceRequest> askList = null;
|
||||||
ArrayList<ContainerId> releaseList = null;
|
List<ContainerId> releaseList = null;
|
||||||
AllocateRequest allocateRequest = null;
|
AllocateRequest allocateRequest = null;
|
||||||
|
List<String> blacklistToAdd = new ArrayList<String>();
|
||||||
|
List<String> blacklistToRemove = new ArrayList<String>();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
|
@ -217,9 +223,22 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
||||||
// optimistically clear this collection assuming no RPC failure
|
// optimistically clear this collection assuming no RPC failure
|
||||||
ask.clear();
|
ask.clear();
|
||||||
release.clear();
|
release.clear();
|
||||||
|
|
||||||
|
blacklistToAdd.addAll(blacklistAdditions);
|
||||||
|
blacklistToRemove.addAll(blacklistRemovals);
|
||||||
|
|
||||||
|
ResourceBlacklistRequest blacklistRequest =
|
||||||
|
(blacklistToAdd != null) || (blacklistToRemove != null) ?
|
||||||
|
ResourceBlacklistRequest.newInstance(blacklistToAdd,
|
||||||
|
blacklistToRemove) : null;
|
||||||
|
|
||||||
allocateRequest =
|
allocateRequest =
|
||||||
AllocateRequest.newInstance(lastResponseId, progressIndicator,
|
AllocateRequest.newInstance(lastResponseId, progressIndicator,
|
||||||
askList, releaseList, null);
|
askList, releaseList, blacklistRequest);
|
||||||
|
// clear blacklistAdditions and blacklistRemovals before
|
||||||
|
// unsynchronized part
|
||||||
|
blacklistAdditions.clear();
|
||||||
|
blacklistRemovals.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
allocateResponse = rmClient.allocate(allocateRequest);
|
allocateResponse = rmClient.allocate(allocateRequest);
|
||||||
|
@ -253,6 +272,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
||||||
ask.add(oldAsk);
|
ask.add(oldAsk);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
blacklistAdditions.addAll(blacklistToAdd);
|
||||||
|
blacklistRemovals.addAll(blacklistToRemove);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -604,4 +626,31 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
|
||||||
+ " #asks=" + ask.size());
|
+ " #asks=" + ask.size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void updateBlacklist(List<String> blacklistAdditions,
|
||||||
|
List<String> blacklistRemovals) {
|
||||||
|
|
||||||
|
if (blacklistAdditions != null) {
|
||||||
|
this.blacklistAdditions.addAll(blacklistAdditions);
|
||||||
|
// if some resources are also in blacklistRemovals updated before, we
|
||||||
|
// should remove them here.
|
||||||
|
this.blacklistRemovals.removeAll(blacklistAdditions);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (blacklistRemovals != null) {
|
||||||
|
this.blacklistRemovals.addAll(blacklistRemovals);
|
||||||
|
// if some resources are in blacklistAdditions before, we should remove
|
||||||
|
// them here.
|
||||||
|
this.blacklistAdditions.removeAll(blacklistRemovals);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (blacklistAdditions != null && blacklistRemovals != null
|
||||||
|
&& blacklistAdditions.removeAll(blacklistRemovals)) {
|
||||||
|
// we allow resources to appear in addition list and removal list in the
|
||||||
|
// same invocation of updateBlacklist(), but should get a warn here.
|
||||||
|
LOG.warn("The same resources appear in both blacklistAdditions and " +
|
||||||
|
"blacklistRemovals in updateBlacklist.");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,13 +18,16 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.client.api.impl;
|
package org.apache.hadoop.yarn.client.api.impl;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -97,6 +100,7 @@ public class TestAMRMClient {
|
||||||
static String rack;
|
static String rack;
|
||||||
static String[] nodes;
|
static String[] nodes;
|
||||||
static String[] racks;
|
static String[] racks;
|
||||||
|
private final static int DEFAULT_ITERATION = 3;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setup() throws Exception {
|
public static void setup() throws Exception {
|
||||||
|
@ -477,6 +481,144 @@ public class TestAMRMClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test (timeout=60000)
|
||||||
|
public void testAllocationWithBlacklist() throws YarnException, IOException {
|
||||||
|
AMRMClientImpl<ContainerRequest> amClient = null;
|
||||||
|
try {
|
||||||
|
// start am rm client
|
||||||
|
amClient =
|
||||||
|
(AMRMClientImpl<ContainerRequest>) AMRMClient
|
||||||
|
.<ContainerRequest> createAMRMClient();
|
||||||
|
amClient.init(conf);
|
||||||
|
amClient.start();
|
||||||
|
amClient.registerApplicationMaster("Host", 10000, "");
|
||||||
|
|
||||||
|
assertTrue(amClient.ask.size() == 0);
|
||||||
|
assertTrue(amClient.release.size() == 0);
|
||||||
|
|
||||||
|
ContainerRequest storedContainer1 =
|
||||||
|
new ContainerRequest(capability, nodes, racks, priority);
|
||||||
|
amClient.addContainerRequest(storedContainer1);
|
||||||
|
assertTrue(amClient.ask.size() == 3);
|
||||||
|
assertTrue(amClient.release.size() == 0);
|
||||||
|
|
||||||
|
List<String> localNodeBlacklist = new ArrayList<String>();
|
||||||
|
localNodeBlacklist.add(node);
|
||||||
|
|
||||||
|
// put node in black list, so no container assignment
|
||||||
|
amClient.updateBlacklist(localNodeBlacklist, null);
|
||||||
|
|
||||||
|
int allocatedContainerCount = getAllocatedContainersNumber(amClient,
|
||||||
|
DEFAULT_ITERATION);
|
||||||
|
// the only node is in blacklist, so no allocation
|
||||||
|
assertTrue(allocatedContainerCount == 0);
|
||||||
|
|
||||||
|
// Remove node from blacklist, so get assigned with 2
|
||||||
|
amClient.updateBlacklist(null, localNodeBlacklist);
|
||||||
|
ContainerRequest storedContainer2 =
|
||||||
|
new ContainerRequest(capability, nodes, racks, priority);
|
||||||
|
amClient.addContainerRequest(storedContainer2);
|
||||||
|
allocatedContainerCount = getAllocatedContainersNumber(amClient,
|
||||||
|
DEFAULT_ITERATION);
|
||||||
|
assertEquals(allocatedContainerCount, 2);
|
||||||
|
|
||||||
|
// Test in case exception in allocate(), blacklist is kept
|
||||||
|
assertTrue(amClient.blacklistAdditions.isEmpty());
|
||||||
|
assertTrue(amClient.blacklistRemovals.isEmpty());
|
||||||
|
|
||||||
|
// create a invalid ContainerRequest - memory value is minus
|
||||||
|
ContainerRequest invalidContainerRequest =
|
||||||
|
new ContainerRequest(Resource.newInstance(-1024, 1),
|
||||||
|
nodes, racks, priority);
|
||||||
|
amClient.addContainerRequest(invalidContainerRequest);
|
||||||
|
amClient.updateBlacklist(localNodeBlacklist, null);
|
||||||
|
try {
|
||||||
|
// allocate() should complain as ContainerRequest is invalid.
|
||||||
|
amClient.allocate(0.1f);
|
||||||
|
fail("there should be an exception here.");
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertEquals(amClient.blacklistAdditions.size(), 1);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
|
||||||
|
amClient.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout=60000)
|
||||||
|
public void testAMRMClientWithBlacklist() throws YarnException, IOException {
|
||||||
|
AMRMClientImpl<ContainerRequest> amClient = null;
|
||||||
|
try {
|
||||||
|
// start am rm client
|
||||||
|
amClient =
|
||||||
|
(AMRMClientImpl<ContainerRequest>) AMRMClient
|
||||||
|
.<ContainerRequest> createAMRMClient();
|
||||||
|
amClient.init(conf);
|
||||||
|
amClient.start();
|
||||||
|
amClient.registerApplicationMaster("Host", 10000, "");
|
||||||
|
String[] nodes = {"node1", "node2", "node3"};
|
||||||
|
|
||||||
|
// Add nodes[0] and nodes[1]
|
||||||
|
List<String> nodeList01 = new ArrayList<String>();
|
||||||
|
nodeList01.add(nodes[0]);
|
||||||
|
nodeList01.add(nodes[1]);
|
||||||
|
amClient.updateBlacklist(nodeList01, null);
|
||||||
|
assertEquals(amClient.blacklistAdditions.size(),2);
|
||||||
|
assertEquals(amClient.blacklistRemovals.size(),0);
|
||||||
|
|
||||||
|
// Add nodes[0] again, verify it is not added duplicated.
|
||||||
|
List<String> nodeList02 = new ArrayList<String>();
|
||||||
|
nodeList02.add(nodes[0]);
|
||||||
|
nodeList02.add(nodes[2]);
|
||||||
|
amClient.updateBlacklist(nodeList02, null);
|
||||||
|
assertEquals(amClient.blacklistAdditions.size(),3);
|
||||||
|
assertEquals(amClient.blacklistRemovals.size(),0);
|
||||||
|
|
||||||
|
// Add nodes[1] and nodes[2] to removal list,
|
||||||
|
// Verify addition list remove these two nodes.
|
||||||
|
List<String> nodeList12 = new ArrayList<String>();
|
||||||
|
nodeList12.add(nodes[1]);
|
||||||
|
nodeList12.add(nodes[2]);
|
||||||
|
amClient.updateBlacklist(null, nodeList12);
|
||||||
|
assertEquals(amClient.blacklistAdditions.size(),1);
|
||||||
|
assertEquals(amClient.blacklistRemovals.size(),2);
|
||||||
|
|
||||||
|
// Add nodes[1] again to addition list,
|
||||||
|
// Verify removal list will remove this node.
|
||||||
|
List<String> nodeList1 = new ArrayList<String>();
|
||||||
|
nodeList1.add(nodes[1]);
|
||||||
|
amClient.updateBlacklist(nodeList1, null);
|
||||||
|
assertEquals(amClient.blacklistAdditions.size(),2);
|
||||||
|
assertEquals(amClient.blacklistRemovals.size(),1);
|
||||||
|
} finally {
|
||||||
|
if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
|
||||||
|
amClient.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getAllocatedContainersNumber(
|
||||||
|
AMRMClientImpl<ContainerRequest> amClient, int iterationsLeft)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
int allocatedContainerCount = 0;
|
||||||
|
while (iterationsLeft-- > 0) {
|
||||||
|
Log.info(" == alloc " + allocatedContainerCount + " it left " + iterationsLeft);
|
||||||
|
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
||||||
|
assertTrue(amClient.ask.size() == 0);
|
||||||
|
assertTrue(amClient.release.size() == 0);
|
||||||
|
|
||||||
|
assertTrue(nodeCount == amClient.getClusterNodeCount());
|
||||||
|
allocatedContainerCount += allocResponse.getAllocatedContainers().size();
|
||||||
|
|
||||||
|
if(allocatedContainerCount == 0) {
|
||||||
|
// sleep to let NM's heartbeat to RM and trigger allocations
|
||||||
|
sleep(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return allocatedContainerCount;
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout=60000)
|
@Test (timeout=60000)
|
||||||
public void testAMRMClient() throws YarnException, IOException {
|
public void testAMRMClient() throws YarnException, IOException {
|
||||||
AMRMClient<ContainerRequest> amClient = null;
|
AMRMClient<ContainerRequest> amClient = null;
|
||||||
|
|
|
@ -125,7 +125,7 @@ public class ResourceBlacklistRequestPBImpl extends ResourceBlacklistRequest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setBlacklistAdditions(List<String> resourceNames) {
|
public void setBlacklistAdditions(List<String> resourceNames) {
|
||||||
if (resourceNames == null) {
|
if (resourceNames == null || resourceNames.isEmpty()) {
|
||||||
if (this.blacklistAdditions != null) {
|
if (this.blacklistAdditions != null) {
|
||||||
this.blacklistAdditions.clear();
|
this.blacklistAdditions.clear();
|
||||||
}
|
}
|
||||||
|
@ -144,7 +144,7 @@ public class ResourceBlacklistRequestPBImpl extends ResourceBlacklistRequest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setBlacklistRemovals(List<String> resourceNames) {
|
public void setBlacklistRemovals(List<String> resourceNames) {
|
||||||
if (resourceNames == null) {
|
if (resourceNames == null || resourceNames.isEmpty()) {
|
||||||
if (this.blacklistRemovals != null) {
|
if (this.blacklistRemovals != null) {
|
||||||
this.blacklistRemovals.clear();
|
this.blacklistRemovals.clear();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue