MAPREDUCE-6871. Allow users to specify racks and nodes for strict locality for AMs (rkanter)

This commit is contained in:
Robert Kanter 2017-04-21 16:12:01 -07:00
parent 5078df7be3
commit 3721cfe1fb
3 changed files with 279 additions and 26 deletions

View File

@ -91,6 +91,12 @@ public interface MRJobConfig {
*/ */
public static final String REDUCE_NODE_LABEL_EXP = "mapreduce.reduce.node-label-expression"; public static final String REDUCE_NODE_LABEL_EXP = "mapreduce.reduce.node-label-expression";
/**
* Specify strict locality on a comma-separated list of racks and/or nodes.
* Syntax: /rack or /rack/node or node (assumes /default-rack)
*/
public static final String AM_STRICT_LOCALITY = "mapreduce.job.am.strict-locality";
public static final String RESERVATION_ID = "mapreduce.job.reservation.id"; public static final String RESERVATION_ID = "mapreduce.job.reservation.id";
public static final String JOB_TAGS = "mapreduce.job.tags"; public static final String JOB_TAGS = "mapreduce.job.tags";

View File

@ -22,13 +22,14 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Vector; import java.util.Vector;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -101,6 +102,25 @@ public class YARNRunner implements ClientProtocol {
private static final Log LOG = LogFactory.getLog(YARNRunner.class); private static final Log LOG = LogFactory.getLog(YARNRunner.class);
private static final String RACK_GROUP = "rack";
private static final String NODE_IF_RACK_GROUP = "node1";
private static final String NODE_IF_NO_RACK_GROUP = "node2";
/**
* Matches any of the following patterns with capturing groups:
* <ul>
* <li>/rack</li>
* <li>/rack/node</li>
* <li>node (assumes /default-rack)</li>
* </ul>
* The groups can be retrieved using the RACK_GROUP, NODE_IF_RACK_GROUP,
* and/or NODE_IF_NO_RACK_GROUP group keys.
*/
private static final Pattern RACK_NODE_PATTERN =
Pattern.compile(
String.format("(?<%s>[^/]+?)|(?<%s>/[^/]+?)(?:/(?<%s>[^/]+?))?",
NODE_IF_NO_RACK_GROUP, RACK_GROUP, NODE_IF_RACK_GROUP));
private final static RecordFactory recordFactory = RecordFactoryProvider private final static RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null); .getRecordFactory(null);
@ -503,20 +523,6 @@ public class YARNRunner implements ClientProtocol {
throws IOException { throws IOException {
ApplicationId applicationId = resMgrDelegate.getApplicationId(); ApplicationId applicationId = resMgrDelegate.getApplicationId();
// Setup resource requirements
Resource capability = recordFactory.newRecordInstance(Resource.class);
capability.setMemorySize(
conf.getInt(
MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
)
);
capability.setVirtualCores(
conf.getInt(
MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
)
);
LOG.debug("AppMaster capability = " + capability);
// Setup LocalResources // Setup LocalResources
Map<String, LocalResource> localResources = Map<String, LocalResource> localResources =
setupLocalResources(jobConf, jobSubmitDir); setupLocalResources(jobConf, jobSubmitDir);
@ -577,21 +583,18 @@ public class YARNRunner implements ClientProtocol {
appContext.setMaxAppAttempts( appContext.setMaxAppAttempts(
conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS)); MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
appContext.setResource(capability);
// set labels for the AM container request if present // Setup the AM ResourceRequests
List<ResourceRequest> amResourceRequests = generateResourceRequests();
appContext.setAMContainerResourceRequests(amResourceRequests);
// set labels for the AM container requests if present
String amNodelabelExpression = conf.get(MRJobConfig.AM_NODE_LABEL_EXP); String amNodelabelExpression = conf.get(MRJobConfig.AM_NODE_LABEL_EXP);
if (null != amNodelabelExpression if (null != amNodelabelExpression
&& amNodelabelExpression.trim().length() != 0) { && amNodelabelExpression.trim().length() != 0) {
ResourceRequest amResourceRequest = for (ResourceRequest amResourceRequest : amResourceRequests) {
recordFactory.newRecordInstance(ResourceRequest.class); amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
amResourceRequest.setPriority(AM_CONTAINER_PRIORITY); }
amResourceRequest.setResourceName(ResourceRequest.ANY);
amResourceRequest.setCapability(capability);
amResourceRequest.setNumContainers(1);
amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
appContext.setAMContainerResourceRequests(
Collections.singletonList(amResourceRequest));
} }
// set labels for the Job containers // set labels for the Job containers
appContext.setNodeLabelExpression(jobConf appContext.setNodeLabelExpression(jobConf
@ -616,6 +619,83 @@ public class YARNRunner implements ClientProtocol {
return appContext; return appContext;
} }
private List<ResourceRequest> generateResourceRequests() throws IOException {
Resource capability = recordFactory.newRecordInstance(Resource.class);
capability.setMemorySize(
conf.getInt(
MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
)
);
capability.setVirtualCores(
conf.getInt(
MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
)
);
if (LOG.isDebugEnabled()) {
LOG.debug("AppMaster capability = " + capability);
}
List<ResourceRequest> amResourceRequests = new ArrayList<>();
// Always have an ANY request
ResourceRequest amAnyResourceRequest =
createAMResourceRequest(ResourceRequest.ANY, capability);
Map<String, ResourceRequest> rackRequests = new HashMap<>();
amResourceRequests.add(amAnyResourceRequest);
Collection<String> amStrictResources = conf.getStringCollection(
MRJobConfig.AM_STRICT_LOCALITY);
for (String amStrictResource : amStrictResources) {
amAnyResourceRequest.setRelaxLocality(false);
Matcher matcher = RACK_NODE_PATTERN.matcher(amStrictResource);
if (matcher.matches()) {
String nodeName;
String rackName = matcher.group(RACK_GROUP);
if (rackName == null) {
rackName = "/default-rack";
nodeName = matcher.group(NODE_IF_NO_RACK_GROUP);
} else {
nodeName = matcher.group(NODE_IF_RACK_GROUP);
}
ResourceRequest amRackResourceRequest = rackRequests.get(rackName);
if (amRackResourceRequest == null) {
amRackResourceRequest = createAMResourceRequest(rackName, capability);
amResourceRequests.add(amRackResourceRequest);
rackRequests.put(rackName, amRackResourceRequest);
}
if (nodeName != null) {
amRackResourceRequest.setRelaxLocality(false);
ResourceRequest amNodeResourceRequest =
createAMResourceRequest(nodeName, capability);
amResourceRequests.add(amNodeResourceRequest);
}
} else {
String errMsg =
"Invalid resource name: " + amStrictResource + " specified.";
LOG.warn(errMsg);
throw new IOException(errMsg);
}
}
if (LOG.isDebugEnabled()) {
for (ResourceRequest amResourceRequest : amResourceRequests) {
LOG.debug("ResourceRequest: resource = "
+ amResourceRequest.getResourceName() + ", locality = "
+ amResourceRequest.getRelaxLocality());
}
}
return amResourceRequests;
}
private ResourceRequest createAMResourceRequest(String resource,
Resource capability) {
ResourceRequest resourceRequest =
recordFactory.newRecordInstance(ResourceRequest.class);
resourceRequest.setPriority(AM_CONTAINER_PRIORITY);
resourceRequest.setResourceName(resource);
resourceRequest.setCapability(capability);
resourceRequest.setNumContainers(1);
resourceRequest.setRelaxLocality(true);
return resourceRequest;
}
private void setTokenRenewerConf(ContainerLaunchContext context, private void setTokenRenewerConf(ContainerLaunchContext context,
Configuration conf, String regex) throws IOException { Configuration conf, String regex) throws IOException {
DataOutputBuffer dob = new DataOutputBuffer(); DataOutputBuffer dob = new DataOutputBuffer();

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
@ -40,6 +41,7 @@ import java.io.OutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -93,6 +95,8 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
@ -575,6 +579,169 @@ public class TestYARNRunner {
.getNodeLabelExpression(), "highMem"); .getNodeLabelExpression(), "highMem");
} }
@Test
public void testResourceRequestLocalityAny() throws Exception {
ResourceRequest amAnyResourceRequest =
createResourceRequest(ResourceRequest.ANY, true);
verifyResourceRequestLocality(null, null, amAnyResourceRequest);
verifyResourceRequestLocality(null, "label1", amAnyResourceRequest);
}
@Test
public void testResourceRequestLocalityRack() throws Exception {
ResourceRequest amAnyResourceRequest =
createResourceRequest(ResourceRequest.ANY, false);
ResourceRequest amRackResourceRequest =
createResourceRequest("/rack1", true);
verifyResourceRequestLocality("/rack1", null, amAnyResourceRequest,
amRackResourceRequest);
verifyResourceRequestLocality("/rack1", "label1", amAnyResourceRequest,
amRackResourceRequest);
}
@Test
public void testResourceRequestLocalityNode() throws Exception {
ResourceRequest amAnyResourceRequest =
createResourceRequest(ResourceRequest.ANY, false);
ResourceRequest amRackResourceRequest =
createResourceRequest("/rack1", false);
ResourceRequest amNodeResourceRequest =
createResourceRequest("node1", true);
verifyResourceRequestLocality("/rack1/node1", null, amAnyResourceRequest,
amRackResourceRequest, amNodeResourceRequest);
verifyResourceRequestLocality("/rack1/node1", "label1",
amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest);
}
@Test
public void testResourceRequestLocalityNodeDefaultRack() throws Exception {
ResourceRequest amAnyResourceRequest =
createResourceRequest(ResourceRequest.ANY, false);
ResourceRequest amRackResourceRequest =
createResourceRequest("/default-rack", false);
ResourceRequest amNodeResourceRequest =
createResourceRequest("node1", true);
verifyResourceRequestLocality("node1", null,
amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest);
verifyResourceRequestLocality("node1", "label1",
amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest);
}
@Test
public void testResourceRequestLocalityMultipleNodes() throws Exception {
ResourceRequest amAnyResourceRequest =
createResourceRequest(ResourceRequest.ANY, false);
ResourceRequest amRackResourceRequest =
createResourceRequest("/rack1", false);
ResourceRequest amNodeResourceRequest =
createResourceRequest("node1", true);
ResourceRequest amNode2ResourceRequest =
createResourceRequest("node2", true);
verifyResourceRequestLocality("/rack1/node1,/rack1/node2", null,
amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
amNode2ResourceRequest);
verifyResourceRequestLocality("/rack1/node1,/rack1/node2", "label1",
amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
amNode2ResourceRequest);
}
@Test
public void testResourceRequestLocalityMultipleNodesDifferentRack()
throws Exception {
ResourceRequest amAnyResourceRequest =
createResourceRequest(ResourceRequest.ANY, false);
ResourceRequest amRackResourceRequest =
createResourceRequest("/rack1", false);
ResourceRequest amNodeResourceRequest =
createResourceRequest("node1", true);
ResourceRequest amRack2ResourceRequest =
createResourceRequest("/rack2", false);
ResourceRequest amNode2ResourceRequest =
createResourceRequest("node2", true);
verifyResourceRequestLocality("/rack1/node1,/rack2/node2", null,
amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
amRack2ResourceRequest, amNode2ResourceRequest);
verifyResourceRequestLocality("/rack1/node1,/rack2/node2", "label1",
amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
amRack2ResourceRequest, amNode2ResourceRequest);
}
@Test
public void testResourceRequestLocalityMultipleNodesDefaultRack()
throws Exception {
ResourceRequest amAnyResourceRequest =
createResourceRequest(ResourceRequest.ANY, false);
ResourceRequest amRackResourceRequest =
createResourceRequest("/rack1", false);
ResourceRequest amNodeResourceRequest =
createResourceRequest("node1", true);
ResourceRequest amRack2ResourceRequest =
createResourceRequest("/default-rack", false);
ResourceRequest amNode2ResourceRequest =
createResourceRequest("node2", true);
verifyResourceRequestLocality("/rack1/node1,node2", null,
amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
amRack2ResourceRequest, amNode2ResourceRequest);
verifyResourceRequestLocality("/rack1/node1,node2", "label1",
amAnyResourceRequest, amRackResourceRequest, amNodeResourceRequest,
amRack2ResourceRequest, amNode2ResourceRequest);
}
@Test
public void testResourceRequestLocalityInvalid() throws Exception {
try {
verifyResourceRequestLocality("rack/node1", null,
new ResourceRequest[]{});
fail("Should have failed due to invalid resource but did not");
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains("Invalid resource name"));
}
try {
verifyResourceRequestLocality("/rack/node1/blah", null,
new ResourceRequest[]{});
fail("Should have failed due to invalid resource but did not");
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains("Invalid resource name"));
}
}
private void verifyResourceRequestLocality(String strictResource,
String label, ResourceRequest... expectedReqs) throws Exception {
JobConf jobConf = new JobConf();
if (strictResource != null) {
jobConf.set(MRJobConfig.AM_STRICT_LOCALITY, strictResource);
}
if (label != null) {
jobConf.set(MRJobConfig.AM_NODE_LABEL_EXP, label);
for (ResourceRequest expectedReq : expectedReqs) {
expectedReq.setNodeLabelExpression(label);
}
}
YARNRunner yarnRunner = new YARNRunner(jobConf);
ApplicationSubmissionContext appSubCtx =
buildSubmitContext(yarnRunner, jobConf);
assertEquals(Arrays.asList(expectedReqs),
appSubCtx.getAMContainerResourceRequests());
}
private ResourceRequest createResourceRequest(String name,
boolean relaxLocality) {
Resource capability = recordFactory.newRecordInstance(Resource.class);
capability.setMemorySize(MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
capability.setVirtualCores(MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);
ResourceRequest req =
recordFactory.newRecordInstance(ResourceRequest.class);
req.setPriority(YARNRunner.AM_CONTAINER_PRIORITY);
req.setResourceName(name);
req.setCapability(capability);
req.setNumContainers(1);
req.setRelaxLocality(relaxLocality);
return req;
}
@Test @Test
public void testAMStandardEnvWithDefaultLibPath() throws Exception { public void testAMStandardEnvWithDefaultLibPath() throws Exception {
testAMStandardEnv(false); testAMStandardEnv(false);