Compare commits
4 Commits
Author | SHA1 | Date |
---|---|---|
Devaraj K | f49cec888a | |
Naganarasimha | e274d508ff | |
Naganarasimha | 3fff8bddb2 | |
Naganarasimha | bfc09a2a7b |
|
@ -111,7 +111,6 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
|
|||
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
|
||||
|
@ -973,7 +972,7 @@ public class MRAppMaster extends CompositeService {
|
|||
, containerID);
|
||||
} else {
|
||||
this.containerAllocator = new RMContainerAllocator(
|
||||
this.clientService, this.context, preemptionPolicy);
|
||||
this.clientService, this.context, preemptionPolicy, dispatcher);
|
||||
}
|
||||
((Service)this.containerAllocator).init(getConfig());
|
||||
((Service)this.containerAllocator).start();
|
||||
|
@ -1156,7 +1155,7 @@ public class MRAppMaster extends CompositeService {
|
|||
|
||||
@Override
|
||||
public Set<String> getBlacklistedNodes() {
|
||||
return ((RMContainerRequestor) containerAllocator).getBlacklistedNodes();
|
||||
return ((RMContainerAllocator) containerAllocator).getBlacklistedNodes();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -28,6 +28,7 @@ public class TaskAttemptContainerAssignedEvent extends TaskAttemptEvent {
|
|||
|
||||
private final Container container;
|
||||
private final Map<ApplicationAccessType, String> applicationACLs;
|
||||
private int shufflePort = -1;
|
||||
|
||||
public TaskAttemptContainerAssignedEvent(TaskAttemptId id,
|
||||
Container container, Map<ApplicationAccessType, String> applicationACLs) {
|
||||
|
@ -36,6 +37,14 @@ public class TaskAttemptContainerAssignedEvent extends TaskAttemptEvent {
|
|||
this.applicationACLs = applicationACLs;
|
||||
}
|
||||
|
||||
public int getShufflePort() {
|
||||
return shufflePort;
|
||||
}
|
||||
|
||||
public void setShufflePort(int shufflePort) {
|
||||
this.shufflePort = shufflePort;
|
||||
}
|
||||
|
||||
public Container getContainer() {
|
||||
return this.container;
|
||||
}
|
||||
|
|
|
@ -265,7 +265,8 @@ public abstract class TaskAttemptImpl implements
|
|||
|
||||
// Transitions from the UNASSIGNED state.
|
||||
.addTransition(TaskAttemptStateInternal.UNASSIGNED,
|
||||
TaskAttemptStateInternal.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
|
||||
EnumSet.of(TaskAttemptStateInternal.ASSIGNED,
|
||||
TaskAttemptStateInternal.RUNNING), TaskAttemptEventType.TA_ASSIGNED,
|
||||
new ContainerAssignedTransition())
|
||||
.addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED,
|
||||
TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
|
||||
|
@ -1876,13 +1877,14 @@ public abstract class TaskAttemptImpl implements
|
|||
}
|
||||
|
||||
private static class ContainerAssignedTransition implements
|
||||
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
||||
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent,
|
||||
TaskAttemptStateInternal> {
|
||||
@SuppressWarnings({ "unchecked" })
|
||||
@Override
|
||||
public void transition(final TaskAttemptImpl taskAttempt,
|
||||
TaskAttemptEvent event) {
|
||||
final TaskAttemptContainerAssignedEvent cEvent =
|
||||
(TaskAttemptContainerAssignedEvent) event;
|
||||
public TaskAttemptStateInternal transition(
|
||||
final TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
|
||||
final TaskAttemptContainerAssignedEvent cEvent =
|
||||
(TaskAttemptContainerAssignedEvent) event;
|
||||
Container container = cEvent.getContainer();
|
||||
taskAttempt.container = container;
|
||||
// this is a _real_ Task (classic Hadoop mapred flavor):
|
||||
|
@ -1895,20 +1897,26 @@ public abstract class TaskAttemptImpl implements
|
|||
taskAttempt.remoteTask, taskAttempt.jvmID);
|
||||
|
||||
taskAttempt.computeRackAndLocality();
|
||||
|
||||
//launch the container
|
||||
//create the container object to be launched for a given Task attempt
|
||||
ContainerLaunchContext launchContext = createContainerLaunchContext(
|
||||
cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
|
||||
taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
|
||||
taskAttempt.taskAttemptListener, taskAttempt.credentials);
|
||||
taskAttempt.eventHandler
|
||||
.handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId,
|
||||
launchContext, container, taskAttempt.remoteTask));
|
||||
|
||||
// send event to speculator that our container needs are satisfied
|
||||
taskAttempt.eventHandler.handle
|
||||
(new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
|
||||
if (cEvent.getShufflePort() == -1) {
|
||||
// launch the container
|
||||
// create the container object to be launched for a given Task attempt
|
||||
ContainerLaunchContext launchContext = createContainerLaunchContext(
|
||||
cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
|
||||
taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
|
||||
taskAttempt.taskAttemptListener, taskAttempt.credentials);
|
||||
taskAttempt.eventHandler
|
||||
.handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId,
|
||||
launchContext, container, taskAttempt.remoteTask));
|
||||
|
||||
// send event to speculator that our container needs are satisfied
|
||||
taskAttempt.eventHandler
|
||||
.handle(new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
|
||||
return TaskAttemptStateInternal.ASSIGNED;
|
||||
} else {
|
||||
taskAttempt.onContainerLaunch(cEvent.getShufflePort());
|
||||
return TaskAttemptStateInternal.RUNNING;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1982,7 +1990,6 @@ public abstract class TaskAttemptImpl implements
|
|||
|
||||
private static class LaunchedContainerTransition implements
|
||||
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void transition(TaskAttemptImpl taskAttempt,
|
||||
TaskAttemptEvent evnt) {
|
||||
|
@ -1990,34 +1997,34 @@ public abstract class TaskAttemptImpl implements
|
|||
TaskAttemptContainerLaunchedEvent event =
|
||||
(TaskAttemptContainerLaunchedEvent) evnt;
|
||||
|
||||
//set the launch time
|
||||
taskAttempt.launchTime = taskAttempt.clock.getTime();
|
||||
taskAttempt.shufflePort = event.getShufflePort();
|
||||
|
||||
// register it to TaskAttemptListener so that it can start monitoring it.
|
||||
taskAttempt.taskAttemptListener
|
||||
.registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID);
|
||||
|
||||
//TODO Resolve to host / IP in case of a local address.
|
||||
InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr?
|
||||
NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress());
|
||||
taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
|
||||
taskAttempt.httpPort = nodeHttpInetAddr.getPort();
|
||||
taskAttempt.sendLaunchedEvents();
|
||||
taskAttempt.eventHandler.handle
|
||||
(new SpeculatorEvent
|
||||
(taskAttempt.attemptId, true, taskAttempt.clock.getTime()));
|
||||
//make remoteTask reference as null as it is no more needed
|
||||
//and free up the memory
|
||||
taskAttempt.remoteTask = null;
|
||||
|
||||
//tell the Task that attempt has started
|
||||
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
|
||||
taskAttempt.attemptId,
|
||||
TaskEventType.T_ATTEMPT_LAUNCHED));
|
||||
taskAttempt.onContainerLaunch(event.getShufflePort());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void onContainerLaunch(int shufflePortParam) {
|
||||
// set the launch time
|
||||
launchTime = clock.getTime();
|
||||
this.shufflePort = shufflePortParam;
|
||||
|
||||
// register it to TaskAttemptListener so that it can start monitoring it.
|
||||
taskAttemptListener.registerLaunchedTask(attemptId, jvmID);
|
||||
// TODO Resolve to host / IP in case of a local address.
|
||||
InetSocketAddress nodeHttpInetAddr = // TODO: Costly to create sock-addr?
|
||||
NetUtils.createSocketAddr(container.getNodeHttpAddress());
|
||||
trackerName = nodeHttpInetAddr.getHostName();
|
||||
httpPort = nodeHttpInetAddr.getPort();
|
||||
sendLaunchedEvents();
|
||||
eventHandler.handle(new SpeculatorEvent(attemptId, true, clock.getTime()));
|
||||
// make remoteTask reference as null as it is no more needed
|
||||
// and free up the memory
|
||||
remoteTask = null;
|
||||
|
||||
// tell the Task that attempt has started
|
||||
eventHandler.handle(
|
||||
new TaskTAttemptEvent(attemptId, TaskEventType.T_ATTEMPT_LAUNCHED));
|
||||
}
|
||||
|
||||
private static class CommitPendingTransition implements
|
||||
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.mapreduce.v2.app.rm;
|
||||
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.event.AbstractEvent;
|
||||
|
||||
/**
|
||||
* Event class for ContainerRequestor.
|
||||
*/
|
||||
public class ContainerAvailableEvent
|
||||
extends AbstractEvent<RMContainerReuseRequestor.EventType> {
|
||||
|
||||
private final TaskAttemptId taskAttemptId;
|
||||
private final Container container;
|
||||
|
||||
public ContainerAvailableEvent(RMContainerReuseRequestor.EventType eventType,
|
||||
TaskAttemptId taskAttemptId, Container container) {
|
||||
super(eventType);
|
||||
this.taskAttemptId = taskAttemptId;
|
||||
this.container = container;
|
||||
}
|
||||
|
||||
public TaskAttemptId getTaskAttemptId() {
|
||||
return taskAttemptId;
|
||||
}
|
||||
|
||||
public Container getContainer() {
|
||||
return container;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,56 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.mapreduce.v2.app.rm;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
|
||||
/**
|
||||
* Interface for ContainerReqestor.
|
||||
*/
|
||||
public interface ContainerRequestor {
|
||||
|
||||
AllocateResponse makeRemoteRequest()
|
||||
throws YarnRuntimeException, YarnException, IOException;
|
||||
|
||||
void addContainerReq(ContainerRequest request);
|
||||
|
||||
void decContainerReq(ContainerRequest request);
|
||||
|
||||
void containerAssigned(Container allocated, ContainerRequest assigned,
|
||||
Map<ApplicationAccessType, String> acls);
|
||||
|
||||
void release(ContainerId containerId);
|
||||
|
||||
boolean isNodeBlacklisted(String hostname);
|
||||
|
||||
Resource getAvailableResources();
|
||||
|
||||
void containerFailedOnHost(String hostName);
|
||||
|
||||
ContainerRequest filterRequest(ContainerRequest orig);
|
||||
}
|
|
@ -53,11 +53,11 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
|
|||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.StringInterner;
|
||||
|
@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException;
|
||||
|
@ -94,7 +95,7 @@ import org.slf4j.LoggerFactory;
|
|||
/**
|
||||
* Allocates the container from the ResourceManager scheduler.
|
||||
*/
|
||||
public class RMContainerAllocator extends RMContainerRequestor
|
||||
public class RMContainerAllocator extends RMCommunicator
|
||||
implements ContainerAllocator {
|
||||
|
||||
static final Logger LOG = LoggerFactory.getLogger(RMContainerAllocator.class);
|
||||
|
@ -114,6 +115,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
private Thread eventHandlingThread;
|
||||
private final AtomicBoolean stopped;
|
||||
|
||||
@VisibleForTesting
|
||||
protected RMContainerRequestor containerRequestor;
|
||||
|
||||
static {
|
||||
PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
|
||||
PRIORITY_FAST_FAIL_MAP.setPriority(5);
|
||||
|
@ -194,10 +198,13 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
|
||||
private String reduceNodeLabelExpression;
|
||||
|
||||
private Dispatcher dispatcher;
|
||||
|
||||
public RMContainerAllocator(ClientService clientService, AppContext context,
|
||||
AMPreemptionPolicy preemptionPolicy) {
|
||||
AMPreemptionPolicy preemptionPolicy, Dispatcher dispatcher) {
|
||||
super(clientService, context);
|
||||
this.preemptionPolicy = preemptionPolicy;
|
||||
this.dispatcher = dispatcher;
|
||||
this.stopped = new AtomicBoolean(false);
|
||||
this.clock = context.getClock();
|
||||
this.assignedRequests = createAssignedRequests();
|
||||
|
@ -207,6 +214,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
return new AssignedRequests();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
|
@ -242,6 +250,15 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
MRJobConfig.DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PERCENT));
|
||||
LOG.info(this.scheduledRequests.getNumOpportunisticMapsPercent() +
|
||||
"% of the mappers will be scheduled using OPPORTUNISTIC containers");
|
||||
if (conf.getBoolean(MRJobConfig.MR_AM_CONTAINER_REUSE_ENABLED,
|
||||
MRJobConfig.DEFAULT_MR_AM_CONTAINER_REUSE_ENABLED)) {
|
||||
containerRequestor = new RMContainerReuseRequestor(eventHandler, this);
|
||||
dispatcher.register(RMContainerReuseRequestor.EventType.class,
|
||||
(RMContainerReuseRequestor) containerRequestor);
|
||||
} else {
|
||||
containerRequestor = new RMContainerRequestor(eventHandler, this);
|
||||
}
|
||||
containerRequestor.init(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -398,8 +415,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
removed = true;
|
||||
assignedRequests.remove(aId);
|
||||
containersReleased++;
|
||||
pendingRelease.add(containerId);
|
||||
release(containerId);
|
||||
containerRequestor.pendingRelease.add(containerId);
|
||||
containerRequestor.release(containerId);
|
||||
}
|
||||
}
|
||||
if (!removed) {
|
||||
|
@ -411,7 +428,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
event.getType() == ContainerAllocator.EventType.CONTAINER_FAILED) {
|
||||
ContainerFailedEvent fEv = (ContainerFailedEvent) event;
|
||||
String host = getHost(fEv.getContMgrAddress());
|
||||
containerFailedOnHost(host);
|
||||
containerRequestor.containerFailedOnHost(host);
|
||||
// propagate failures to preemption policy to discard checkpoints for
|
||||
// failed tasks
|
||||
preemptionPolicy.handleFailedContainer(event.getAttemptID());
|
||||
|
@ -569,13 +586,14 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
// there are enough resources for a mapper to run. This is calculated by
|
||||
// excluding scheduled reducers from headroom and comparing it against
|
||||
// resources required to run one mapper.
|
||||
Resource scheduledReducesResource = Resources.multiply(
|
||||
reduceResourceRequest, scheduledRequests.reduces.size());
|
||||
Resource availableResourceForMap =
|
||||
Resources.subtract(getAvailableResources(), scheduledReducesResource);
|
||||
if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
|
||||
mapResourceRequest, getSchedulerResourceTypes()) > 0) {
|
||||
// Enough room to run a mapper
|
||||
Resource scheduledReducesResource = Resources
|
||||
.multiply(reduceResourceRequest, scheduledRequests.reduces.size());
|
||||
Resource availableResourceForMap = Resources.subtract(
|
||||
containerRequestor.getAvailableResources(), scheduledReducesResource);
|
||||
if (ResourceCalculatorUtils.computeAvailableContainers(
|
||||
availableResourceForMap, mapResourceRequest,
|
||||
getSchedulerResourceTypes()) > 0) {
|
||||
// Enough room to run a mapper
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -651,7 +669,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
}
|
||||
|
||||
// get available resources for this job
|
||||
Resource headRoom = getAvailableResources();
|
||||
Resource headRoom = containerRequestor.getAvailableResources();
|
||||
|
||||
LOG.info("Recalculating schedule, headroom=" + headRoom);
|
||||
|
||||
|
@ -782,7 +800,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
applyConcurrentTaskLimits();
|
||||
|
||||
// will be null the first time
|
||||
Resource headRoom = Resources.clone(getAvailableResources());
|
||||
Resource headRoom = Resources
|
||||
.clone(containerRequestor.getAvailableResources());
|
||||
AllocateResponse response;
|
||||
/*
|
||||
* If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
|
||||
|
@ -790,7 +809,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
* to contact the RM.
|
||||
*/
|
||||
try {
|
||||
response = makeRemoteRequest();
|
||||
response = containerRequestor.makeRemoteRequest();
|
||||
// Reset retry count if no exception occurred.
|
||||
retrystartTime = System.currentTimeMillis();
|
||||
} catch (ApplicationAttemptNotFoundException e ) {
|
||||
|
@ -805,9 +824,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
LOG.info("ApplicationMaster is out of sync with ResourceManager,"
|
||||
+ " hence resync and send outstanding requests.");
|
||||
// RM may have restarted, re-register with RM.
|
||||
lastResponseID = 0;
|
||||
containerRequestor.lastResponseID = 0;
|
||||
register();
|
||||
addOutstandingRequestOnResync();
|
||||
containerRequestor.addOutstandingRequestOnResync();
|
||||
return null;
|
||||
} catch (InvalidLabelResourceRequestException e) {
|
||||
// If Invalid label exception is received means the requested label doesnt
|
||||
|
@ -833,7 +852,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
// continue to attempt to contact the RM.
|
||||
throw e;
|
||||
}
|
||||
Resource newHeadRoom = getAvailableResources();
|
||||
Resource newHeadRoom = containerRequestor.getAvailableResources();
|
||||
List<Container> newContainers = response.getAllocatedContainers();
|
||||
// Setting NMTokens
|
||||
if (response.getNMTokens() != null) {
|
||||
|
@ -874,7 +893,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
}
|
||||
|
||||
//Called on each allocation. Will know about newly blacklisted/added hosts.
|
||||
computeIgnoreBlacklisting();
|
||||
containerRequestor.computeIgnoreBlacklisting();
|
||||
|
||||
handleUpdatedNodes(response);
|
||||
handleJobPriorityChange(response);
|
||||
|
@ -900,7 +919,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
LOG.error("Container complete event for unknown container "
|
||||
+ container.getContainerId());
|
||||
} else {
|
||||
pendingRelease.remove(container.getContainerId());
|
||||
containerRequestor.pendingRelease.remove(container.getContainerId());
|
||||
assignedRequests.remove(attemptID);
|
||||
|
||||
// Send the diagnostics
|
||||
|
@ -927,11 +946,12 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
int normalMapRequestLimit = Math.min(
|
||||
maxRequestedMaps - failedMapRequestLimit,
|
||||
numScheduledMaps - numScheduledFailMaps);
|
||||
setRequestLimit(PRIORITY_FAST_FAIL_MAP, mapResourceRequest,
|
||||
failedMapRequestLimit);
|
||||
setRequestLimit(PRIORITY_MAP, mapResourceRequest, normalMapRequestLimit);
|
||||
setRequestLimit(PRIORITY_OPPORTUNISTIC_MAP, mapResourceRequest,
|
||||
containerRequestor.setRequestLimit(PRIORITY_FAST_FAIL_MAP,
|
||||
mapResourceRequest, failedMapRequestLimit);
|
||||
containerRequestor.setRequestLimit(PRIORITY_MAP, mapResourceRequest,
|
||||
normalMapRequestLimit);
|
||||
containerRequestor.setRequestLimit(PRIORITY_OPPORTUNISTIC_MAP,
|
||||
mapResourceRequest, normalMapRequestLimit);
|
||||
}
|
||||
|
||||
int numScheduledReduces = scheduledRequests.reduces.size();
|
||||
|
@ -941,7 +961,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
maxRunningReduces - assignedRequests.reduces.size());
|
||||
int reduceRequestLimit = Math.min(maxRequestedReduces,
|
||||
numScheduledReduces);
|
||||
setRequestLimit(PRIORITY_REDUCE, reduceResourceRequest,
|
||||
containerRequestor.setRequestLimit(PRIORITY_REDUCE, reduceResourceRequest,
|
||||
reduceRequestLimit);
|
||||
}
|
||||
}
|
||||
|
@ -1036,7 +1056,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
|
||||
@Private
|
||||
public Resource getResourceLimit() {
|
||||
Resource headRoom = getAvailableResources();
|
||||
Resource headRoom = containerRequestor.getAvailableResources();
|
||||
Resource assignedMapResource =
|
||||
Resources.multiply(mapResourceRequest, assignedRequests.maps.size());
|
||||
Resource assignedReduceResource =
|
||||
|
@ -1087,7 +1107,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
if (req == null) {
|
||||
return false;
|
||||
} else {
|
||||
decContainerReq(req);
|
||||
containerRequestor.decContainerReq(req);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -1097,7 +1117,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
if (it.hasNext()) {
|
||||
Entry<TaskAttemptId, ContainerRequest> entry = it.next();
|
||||
it.remove();
|
||||
decContainerReq(entry.getValue());
|
||||
containerRequestor.decContainerReq(entry.getValue());
|
||||
return entry.getValue();
|
||||
}
|
||||
return null;
|
||||
|
@ -1114,14 +1134,15 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
LOG.info("Added "+event.getAttemptID()+" to list of failed maps");
|
||||
// If its an earlier Failed attempt, do not retry as OPPORTUNISTIC
|
||||
maps.put(event.getAttemptID(), request);
|
||||
addContainerReq(request);
|
||||
containerRequestor.addContainerReq(request);
|
||||
} else {
|
||||
if (mapsMod100 < numOpportunisticMapsPercent) {
|
||||
request =
|
||||
new ContainerRequest(event, PRIORITY_OPPORTUNISTIC_MAP,
|
||||
mapNodeLabelExpression);
|
||||
maps.put(event.getAttemptID(), request);
|
||||
addOpportunisticResourceRequest(request.priority, request.capability);
|
||||
containerRequestor.addOpportunisticResourceRequest(request.priority,
|
||||
request.capability);
|
||||
} else {
|
||||
request =
|
||||
new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression);
|
||||
|
@ -1148,7 +1169,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
}
|
||||
}
|
||||
maps.put(event.getAttemptID(), request);
|
||||
addContainerReq(request);
|
||||
containerRequestor.addContainerReq(request);
|
||||
}
|
||||
mapsMod100++;
|
||||
mapsMod100 %= 100;
|
||||
|
@ -1158,7 +1179,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
|
||||
void addReduce(ContainerRequest req) {
|
||||
reduces.put(req.attemptID, req);
|
||||
addContainerReq(req);
|
||||
containerRequestor.addContainerReq(req);
|
||||
}
|
||||
|
||||
// this method will change the list of allocatedContainers.
|
||||
|
@ -1223,7 +1244,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
// do not assign if allocated container is on a
|
||||
// blacklisted host
|
||||
String allocatedHost = allocated.getNodeId().getHost();
|
||||
if (isNodeBlacklisted(allocatedHost)) {
|
||||
if (containerRequestor.isNodeBlacklisted(allocatedHost)) {
|
||||
// we need to request for a new container
|
||||
// and release the current one
|
||||
LOG.info("Got allocated container on a blacklisted "
|
||||
|
@ -1237,9 +1258,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
if (toBeReplacedReq != null) {
|
||||
LOG.info("Placing a new container request for task attempt "
|
||||
+ toBeReplacedReq.attemptID);
|
||||
ContainerRequest newReq =
|
||||
getFilteredContainerRequest(toBeReplacedReq);
|
||||
decContainerReq(toBeReplacedReq);
|
||||
ContainerRequest newReq = containerRequestor
|
||||
.filterRequest(toBeReplacedReq);
|
||||
containerRequestor.decContainerReq(toBeReplacedReq);
|
||||
if (toBeReplacedReq.attemptID.getTaskId().getTaskType() ==
|
||||
TaskType.MAP) {
|
||||
maps.put(newReq.attemptID, newReq);
|
||||
|
@ -1247,7 +1268,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
else {
|
||||
reduces.put(newReq.attemptID, newReq);
|
||||
}
|
||||
addContainerReq(newReq);
|
||||
containerRequestor.addContainerReq(newReq);
|
||||
}
|
||||
else {
|
||||
LOG.info("Could not map allocated container to a valid request."
|
||||
|
@ -1276,11 +1297,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
private void containerAssigned(Container allocated,
|
||||
ContainerRequest assigned) {
|
||||
// Update resource requests
|
||||
decContainerReq(assigned);
|
||||
|
||||
// send the container-assigned event to task attempt
|
||||
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
|
||||
assigned.attemptID, allocated, applicationACLs));
|
||||
containerRequestor.containerAssigned(allocated, assigned,
|
||||
applicationACLs);
|
||||
|
||||
assignedRequests.add(allocated, assigned.attemptID);
|
||||
|
||||
|
@ -1293,8 +1311,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
|
||||
private void containerNotAssigned(Container allocated) {
|
||||
containersReleased++;
|
||||
pendingRelease.add(allocated.getId());
|
||||
release(allocated.getId());
|
||||
containerRequestor.pendingRelease.add(allocated.getId());
|
||||
containerRequestor.release(allocated.getId());
|
||||
}
|
||||
|
||||
private ContainerRequest assignWithoutLocality(Container allocated) {
|
||||
|
@ -1659,4 +1677,18 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
|
||||
}
|
||||
|
||||
public Set<String> getBlacklistedNodes() {
|
||||
return containerRequestor.getBlacklistedNodes();
|
||||
}
|
||||
|
||||
public RMContainerRequestor getContainerRequestor() {
|
||||
return containerRequestor;
|
||||
}
|
||||
|
||||
public void resetContainerForReuse(ContainerId containerId) {
|
||||
TaskAttemptId attemptId = assignedRequests.get(containerId);
|
||||
if (attemptId != null) {
|
||||
assignedRequests.remove(attemptId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,25 +34,29 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
||||
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -60,7 +64,8 @@ import org.slf4j.LoggerFactory;
|
|||
/**
|
||||
* Keeps the data structures to send container requests to RM.
|
||||
*/
|
||||
public abstract class RMContainerRequestor extends RMCommunicator {
|
||||
public class RMContainerRequestor extends AbstractService
|
||||
implements ContainerRequestor {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(RMContainerRequestor.class);
|
||||
|
@ -78,7 +83,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|||
//Value->Map
|
||||
//Key->Resource Capability
|
||||
//Value->ResourceRequest
|
||||
private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
|
||||
protected final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
|
||||
remoteRequestsTable =
|
||||
new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
|
||||
|
||||
|
@ -111,9 +116,18 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|||
.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
|
||||
private final Set<String> blacklistRemovals = Collections
|
||||
.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
|
||||
private final ApplicationId applicationId;
|
||||
private final RMCommunicator rmCommunicator;
|
||||
@SuppressWarnings("rawtypes")
|
||||
private EventHandler eventHandler;
|
||||
|
||||
public RMContainerRequestor(ClientService clientService, AppContext context) {
|
||||
super(clientService, context);
|
||||
@SuppressWarnings("rawtypes")
|
||||
public RMContainerRequestor(EventHandler eventHandler,
|
||||
RMCommunicator rmCommunicator) {
|
||||
super(RMContainerRequestor.class.getName());
|
||||
this.rmCommunicator = rmCommunicator;
|
||||
applicationId = rmCommunicator.applicationId;
|
||||
this.eventHandler = eventHandler;
|
||||
}
|
||||
|
||||
@Private
|
||||
|
@ -194,17 +208,19 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|||
LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
|
||||
}
|
||||
|
||||
protected AllocateResponse makeRemoteRequest() throws YarnException,
|
||||
IOException {
|
||||
@Override
|
||||
public AllocateResponse makeRemoteRequest()
|
||||
throws YarnException, IOException {
|
||||
applyRequestLimits();
|
||||
ResourceBlacklistRequest blacklistRequest =
|
||||
ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions),
|
||||
new ArrayList<String>(blacklistRemovals));
|
||||
AllocateRequest allocateRequest =
|
||||
AllocateRequest.newInstance(lastResponseID,
|
||||
super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
|
||||
new ArrayList<ContainerId>(release), blacklistRequest);
|
||||
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
|
||||
AllocateRequest allocateRequest = AllocateRequest.newInstance(
|
||||
lastResponseID, rmCommunicator.getApplicationProgress(),
|
||||
new ArrayList<ResourceRequest>(ask),
|
||||
new ArrayList<ContainerId>(release), blacklistRequest);
|
||||
AllocateResponse allocateResponse = rmCommunicator.scheduler
|
||||
.allocate(allocateRequest);
|
||||
lastResponseID = allocateResponse.getResponseId();
|
||||
availableResources = allocateResponse.getAvailableResources();
|
||||
lastClusterNmCount = clusterNmCount;
|
||||
|
@ -324,7 +340,8 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|||
}
|
||||
}
|
||||
|
||||
protected void containerFailedOnHost(String hostName) {
|
||||
@Override
|
||||
public void containerFailedOnHost(String hostName) {
|
||||
if (!nodeBlacklistingEnabled) {
|
||||
return;
|
||||
}
|
||||
|
@ -389,11 +406,13 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|||
}
|
||||
}
|
||||
|
||||
protected Resource getAvailableResources() {
|
||||
@Override
|
||||
public Resource getAvailableResources() {
|
||||
return availableResources == null ? Resources.none() : availableResources;
|
||||
}
|
||||
|
||||
protected void addContainerReq(ContainerRequest req) {
|
||||
@Override
|
||||
public void addContainerReq(ContainerRequest req) {
|
||||
// Create resource requests
|
||||
for (String host : req.hosts) {
|
||||
// Data-local
|
||||
|
@ -414,16 +433,28 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|||
req.nodeLabelExpression);
|
||||
}
|
||||
|
||||
protected void decContainerReq(ContainerRequest req) {
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void containerAssigned(Container allocated, ContainerRequest req,
|
||||
Map<ApplicationAccessType, String> applicationACLs) {
|
||||
decContainerReq(req);
|
||||
|
||||
// send the container-assigned event to task attempt
|
||||
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
|
||||
req.attemptID, allocated, applicationACLs));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decContainerReq(ContainerRequest req) {
|
||||
// Update resource requests
|
||||
for (String hostName : req.hosts) {
|
||||
decResourceRequest(req.priority, hostName, req.capability);
|
||||
}
|
||||
|
||||
|
||||
for (String rack : req.racks) {
|
||||
decResourceRequest(req.priority, rack, req.capability);
|
||||
}
|
||||
|
||||
|
||||
decResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
|
||||
}
|
||||
|
||||
|
@ -540,18 +571,21 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|||
ask.add(remoteRequest);
|
||||
}
|
||||
|
||||
protected void release(ContainerId containerId) {
|
||||
@Override
|
||||
public void release(ContainerId containerId) {
|
||||
release.add(containerId);
|
||||
}
|
||||
|
||||
protected boolean isNodeBlacklisted(String hostname) {
|
||||
@Override
|
||||
public boolean isNodeBlacklisted(String hostname) {
|
||||
if (!nodeBlacklistingEnabled || ignoreBlacklisting.get()) {
|
||||
return false;
|
||||
}
|
||||
return blacklistedNodes.contains(hostname);
|
||||
}
|
||||
|
||||
protected ContainerRequest getFilteredContainerRequest(ContainerRequest orig) {
|
||||
|
||||
@Override
|
||||
public ContainerRequest filterRequest(ContainerRequest orig) {
|
||||
ArrayList<String> newHosts = new ArrayList<String>();
|
||||
for (String host : orig.hosts) {
|
||||
if (!isNodeBlacklisted(host)) {
|
||||
|
|
|
@ -0,0 +1,264 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.app.rm;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Keeps the data for RMContainer's reuse.
|
||||
*/
|
||||
public class RMContainerReuseRequestor extends RMContainerRequestor
|
||||
implements EventHandler<ContainerAvailableEvent> {
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(RMContainerReuseRequestor.class);
|
||||
|
||||
private Map<Container, HostInfo> containersToReuse =
|
||||
new ConcurrentHashMap<>();
|
||||
private Map<ContainerId, List<TaskAttemptId>> containerToTaskAttemptsMap =
|
||||
new HashMap<ContainerId, List<TaskAttemptId>>();
|
||||
private int containerReuseMaxMapTasks;
|
||||
private int containerReuseMaxReduceTasks;
|
||||
private int maxMapTaskContainers;
|
||||
private int maxReduceTaskContainers;
|
||||
private int noOfMapTaskContainersForReuse;
|
||||
private int noOfReduceTaskContainersForReuse;
|
||||
private final RMCommunicator rmCommunicator;
|
||||
@SuppressWarnings("rawtypes")
|
||||
private final EventHandler eventHandler;
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
public RMContainerReuseRequestor(
|
||||
EventHandler eventHandler,
|
||||
RMCommunicator rmCommunicator) {
|
||||
super(eventHandler, rmCommunicator);
|
||||
this.rmCommunicator = rmCommunicator;
|
||||
this.eventHandler = eventHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
containerReuseMaxMapTasks = conf.getInt(
|
||||
MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_MAPTASKS,
|
||||
MRJobConfig.DEFAULT_MR_AM_CONTAINER_REUSE_MAX_MAPTASKS);
|
||||
containerReuseMaxReduceTasks = conf.getInt(
|
||||
MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_REDUCETASKS,
|
||||
MRJobConfig.DEFAULT_MR_AM_CONTAINER_REUSE_MAX_REDUCETASKS);
|
||||
maxMapTaskContainers = conf.getInt(
|
||||
MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_MAPTASKCONTAINERS,
|
||||
MRJobConfig.DEFAULT_MR_AM_CONTAINER_REUSE_MAX_MAPTASKCONTAINERS);
|
||||
maxReduceTaskContainers = conf.getInt(
|
||||
MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_REDUCETASKCONTAINERS,
|
||||
MRJobConfig.DEFAULT_MR_AM_CONTAINER_REUSE_MAX_REDUCETASKCONTAINERS);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AllocateResponse makeRemoteRequest()
|
||||
throws YarnException, IOException {
|
||||
AllocateResponse amResponse = super.makeRemoteRequest();
|
||||
synchronized (containersToReuse) {
|
||||
List<Container> allocatedContainers = amResponse.getAllocatedContainers();
|
||||
allocatedContainers.addAll(containersToReuse.keySet());
|
||||
containersToReuse.clear();
|
||||
}
|
||||
return amResponse;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void containerFailedOnHost(String hostName) {
|
||||
super.containerFailedOnHost(hostName);
|
||||
boolean blacklisted = super.isNodeBlacklisted(hostName);
|
||||
if (blacklisted) {
|
||||
Set<Container> containersOnHost = new HashSet<Container>();
|
||||
for (Entry<Container, HostInfo> elem : containersToReuse.entrySet()) {
|
||||
if (elem.getValue().getHost().equals(hostName)) {
|
||||
containersOnHost.add(elem.getKey());
|
||||
}
|
||||
}
|
||||
for (Container container : containersOnHost) {
|
||||
containersToReuse.remove(container);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(ContainerAvailableEvent event) {
|
||||
Container container = event.getContainer();
|
||||
ContainerId containerId = container.getId();
|
||||
String resourceName = container.getNodeId().getHost();
|
||||
boolean canReuse = false;
|
||||
Priority priority = container.getPriority();
|
||||
if (RMContainerAllocator.PRIORITY_MAP.equals(priority)
|
||||
|| RMContainerAllocator.PRIORITY_REDUCE.equals(priority)) {
|
||||
List<TaskAttemptId> containerTaskAttempts = null;
|
||||
containerTaskAttempts = containerToTaskAttemptsMap.get(containerId);
|
||||
if (containerTaskAttempts == null) {
|
||||
containerTaskAttempts = new ArrayList<TaskAttemptId>();
|
||||
containerToTaskAttemptsMap.put(containerId, containerTaskAttempts);
|
||||
}
|
||||
TaskAttemptId taskAttemptId = event.getTaskAttemptId();
|
||||
if (checkMapContainerReuseConstraints(priority, containerTaskAttempts)
|
||||
|| checkReduceContainerReuseConstraints(priority,
|
||||
containerTaskAttempts)) {
|
||||
Map<String, Map<Resource, ResourceRequest>> resourceRequests =
|
||||
remoteRequestsTable.get(priority);
|
||||
// If there are any eligible requests
|
||||
if (resourceRequests != null && !resourceRequests.isEmpty()) {
|
||||
canReuse = true;
|
||||
containerTaskAttempts.add(taskAttemptId);
|
||||
}
|
||||
}
|
||||
((RMContainerAllocator) rmCommunicator)
|
||||
.resetContainerForReuse(container.getId());
|
||||
if (canReuse) {
|
||||
int shufflePort =
|
||||
rmCommunicator.getJob().getTask(taskAttemptId.getTaskId())
|
||||
.getAttempt(taskAttemptId).getShufflePort();
|
||||
containersToReuse.put(container,
|
||||
new HostInfo(resourceName, shufflePort));
|
||||
incrementRunningReuseContainers(priority);
|
||||
LOG.info("Adding the " + containerId + " for reuse.");
|
||||
} else {
|
||||
LOG.info("Releasing the container : " + containerId
|
||||
+ " since it is not eligible for reuse or no pending requests.");
|
||||
containerComplete(container);
|
||||
pendingRelease.add(containerId);
|
||||
release(containerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean checkMapContainerReuseConstraints(Priority priority,
|
||||
List<TaskAttemptId> containerTaskAttempts) {
|
||||
return RMContainerAllocator.PRIORITY_MAP.equals(priority)
|
||||
// Check for how many tasks can map task container run maximum
|
||||
&& ((containerTaskAttempts.size() < containerReuseMaxMapTasks
|
||||
|| containerReuseMaxMapTasks == -1)
|
||||
// Check for no of map task containers running
|
||||
&& (noOfMapTaskContainersForReuse < maxMapTaskContainers
|
||||
|| maxMapTaskContainers == -1));
|
||||
}
|
||||
|
||||
private boolean checkReduceContainerReuseConstraints(Priority priority,
|
||||
List<TaskAttemptId> containerTaskAttempts) {
|
||||
return RMContainerAllocator.PRIORITY_REDUCE.equals(priority)
|
||||
// Check for how many tasks can reduce task container run maximum
|
||||
&& ((containerTaskAttempts.size() < containerReuseMaxReduceTasks
|
||||
|| containerReuseMaxReduceTasks == -1)
|
||||
// Check for no of reduce task containers running
|
||||
&& (noOfReduceTaskContainersForReuse < maxReduceTaskContainers
|
||||
|| maxReduceTaskContainers == -1));
|
||||
}
|
||||
|
||||
private void containerComplete(Container container) {
|
||||
if (!containerToTaskAttemptsMap.containsKey(container.getId())) {
|
||||
return;
|
||||
}
|
||||
containerToTaskAttemptsMap.remove(container.getId());
|
||||
if (RMContainerAllocator.PRIORITY_MAP.equals(container.getPriority())) {
|
||||
noOfMapTaskContainersForReuse--;
|
||||
} else if (RMContainerAllocator.PRIORITY_REDUCE
|
||||
.equals(container.getPriority())) {
|
||||
noOfReduceTaskContainersForReuse--;
|
||||
}
|
||||
}
|
||||
|
||||
private void incrementRunningReuseContainers(Priority priority) {
|
||||
if (RMContainerAllocator.PRIORITY_MAP.equals(priority)) {
|
||||
noOfMapTaskContainersForReuse++;
|
||||
} else if (RMContainerAllocator.PRIORITY_REDUCE.equals(priority)) {
|
||||
noOfReduceTaskContainersForReuse++;
|
||||
}
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
Map<Container, HostInfo> getContainersToReuse() {
|
||||
return containersToReuse;
|
||||
}
|
||||
|
||||
/**
|
||||
* Container Available EventType.
|
||||
*/
|
||||
public static enum EventType {
|
||||
CONTAINER_AVAILABLE
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void containerAssigned(Container allocated, ContainerRequest req,
|
||||
Map<ApplicationAccessType, String> applicationACLs) {
|
||||
if(containersToReuse.containsKey(allocated)){
|
||||
decContainerReq(req);
|
||||
// send the container-assigned event to task attempt
|
||||
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
|
||||
req.attemptID, allocated, applicationACLs));
|
||||
} else {
|
||||
super.containerAssigned(allocated, req, applicationACLs);
|
||||
}
|
||||
}
|
||||
|
||||
static class HostInfo {
|
||||
private String host;
|
||||
private int port;
|
||||
public HostInfo(String host, int port) {
|
||||
super();
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
}
|
||||
public String getHost() {
|
||||
return host;
|
||||
}
|
||||
public int getPort() {
|
||||
return port;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -209,7 +209,7 @@ public class MRAppBenchmark {
|
|||
ClientService clientService, AppContext context) {
|
||||
|
||||
AMPreemptionPolicy policy = new NoopAMPreemptionPolicy();
|
||||
return new RMContainerAllocator(clientService, context, policy) {
|
||||
return new RMContainerAllocator(clientService, context, policy, null) {
|
||||
@Override
|
||||
protected ApplicationMasterProtocol createSchedulerProxy() {
|
||||
return new ApplicationMasterProtocol() {
|
||||
|
|
|
@ -27,6 +27,7 @@ import static org.mockito.ArgumentMatchers.anyInt;
|
|||
import static org.mockito.ArgumentMatchers.isA;
|
||||
import static org.mockito.Mockito.doCallRealMethod;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.inOrder;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
|
@ -674,7 +675,7 @@ public class TestRMContainerAllocator {
|
|||
Resource.newInstance(1024, 1),
|
||||
locations, false, true));
|
||||
allocator.scheduleAllReduces();
|
||||
allocator.makeRemoteRequest();
|
||||
allocator.containerRequestor.makeRemoteRequest();
|
||||
nm.nodeHeartbeat(true);
|
||||
rm.drainEvents();
|
||||
allocator.sendRequest(createRequest(jobId, 1,
|
||||
|
@ -2026,7 +2027,8 @@ public class TestRMContainerAllocator {
|
|||
// Use this constructor when using a real job.
|
||||
MyContainerAllocator(MyResourceManager rm,
|
||||
ApplicationAttemptId appAttemptId, AppContext context) {
|
||||
super(createMockClientService(), context, new NoopAMPreemptionPolicy());
|
||||
super(createMockClientService(), context, new NoopAMPreemptionPolicy(),
|
||||
null);
|
||||
this.rm = rm;
|
||||
}
|
||||
|
||||
|
@ -2034,7 +2036,7 @@ public class TestRMContainerAllocator {
|
|||
public MyContainerAllocator(MyResourceManager rm, Configuration conf,
|
||||
ApplicationAttemptId appAttemptId, Job job) {
|
||||
super(createMockClientService(), createAppContext(appAttemptId, job),
|
||||
new NoopAMPreemptionPolicy());
|
||||
new NoopAMPreemptionPolicy(), null);
|
||||
this.rm = rm;
|
||||
super.init(conf);
|
||||
super.start();
|
||||
|
@ -2044,7 +2046,7 @@ public class TestRMContainerAllocator {
|
|||
ApplicationAttemptId appAttemptId, Job job, Clock clock) {
|
||||
super(createMockClientService(),
|
||||
createAppContext(appAttemptId, job, clock),
|
||||
new NoopAMPreemptionPolicy());
|
||||
new NoopAMPreemptionPolicy(), null);
|
||||
this.rm = rm;
|
||||
super.init(conf);
|
||||
super.start();
|
||||
|
@ -2141,24 +2143,18 @@ public class TestRMContainerAllocator {
|
|||
public void updateSchedulerProxy(MyResourceManager rm) {
|
||||
scheduler = rm.getApplicationMasterService();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AllocateResponse makeRemoteRequest() throws IOException,
|
||||
YarnException {
|
||||
allocateResponse = super.makeRemoteRequest();
|
||||
return allocateResponse;
|
||||
}
|
||||
}
|
||||
|
||||
private static class MyContainerAllocator2 extends MyContainerAllocator {
|
||||
public MyContainerAllocator2(MyResourceManager rm, Configuration conf,
|
||||
ApplicationAttemptId appAttemptId, Job job) {
|
||||
ApplicationAttemptId appAttemptId, Job job)
|
||||
throws YarnException, IOException {
|
||||
super(rm, conf, appAttemptId, job);
|
||||
}
|
||||
@Override
|
||||
protected AllocateResponse makeRemoteRequest() throws IOException,
|
||||
YarnException {
|
||||
throw new IOException("for testing");
|
||||
containerRequestor = mock(RMContainerRequestor.class);
|
||||
doThrow(new IOException("for testing")).when(containerRequestor)
|
||||
.makeRemoteRequest();
|
||||
doReturn(Resource.newInstance(2048, 1)).when(containerRequestor)
|
||||
.getAvailableResources();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2171,7 +2167,7 @@ public class TestRMContainerAllocator {
|
|||
AMPreemptionPolicy policy = mock(AMPreemptionPolicy.class);
|
||||
when(communicator.getJob()).thenReturn(mockJob);
|
||||
RMContainerAllocator allocator = new RMContainerAllocator(service, context,
|
||||
policy);
|
||||
policy, null);
|
||||
AllocateResponse response = Records.newRecord(AllocateResponse.class);
|
||||
allocator.handleJobPriorityChange(response);
|
||||
}
|
||||
|
@ -2196,6 +2192,7 @@ public class TestRMContainerAllocator {
|
|||
any(Resource.class), anyInt(), anyFloat(), anyFloat());
|
||||
doReturn(EnumSet.of(SchedulerResourceTypes.MEMORY)).when(allocator)
|
||||
.getSchedulerResourceTypes();
|
||||
allocator.containerRequestor = mock(RMContainerRequestor.class);
|
||||
|
||||
// Test slow-start
|
||||
allocator.scheduleReduces(
|
||||
|
@ -2373,7 +2370,7 @@ public class TestRMContainerAllocator {
|
|||
|
||||
RMContainerAllocator allocator = new RMContainerAllocator(
|
||||
mock(ClientService.class), appContext,
|
||||
new NoopAMPreemptionPolicy()) {
|
||||
new NoopAMPreemptionPolicy(), null) {
|
||||
@Override
|
||||
protected void register() {
|
||||
}
|
||||
|
@ -2424,8 +2421,8 @@ public class TestRMContainerAllocator {
|
|||
public void testCompletedContainerEvent() {
|
||||
RMContainerAllocator allocator = new RMContainerAllocator(
|
||||
mock(ClientService.class), mock(AppContext.class),
|
||||
new NoopAMPreemptionPolicy());
|
||||
|
||||
new NoopAMPreemptionPolicy(), null);
|
||||
|
||||
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
|
||||
MRBuilderUtils.newTaskId(
|
||||
MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1);
|
||||
|
@ -2972,11 +2969,6 @@ public class TestRMContainerAllocator {
|
|||
return mockScheduler;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setRequestLimit(Priority priority,
|
||||
Resource capability, int limit) {
|
||||
Assert.fail("setRequestLimit() should not be invoked");
|
||||
}
|
||||
};
|
||||
|
||||
// create some map requests
|
||||
|
@ -3257,8 +3249,8 @@ public class TestRMContainerAllocator {
|
|||
Assert.assertEquals(1, allocator.getScheduledRequests().maps.size());
|
||||
Assert.assertEquals(0, allocator.getAssignedRequests().maps.size());
|
||||
|
||||
Assert.assertEquals(6, allocator.getAsk().size());
|
||||
for (ResourceRequest req : allocator.getAsk()) {
|
||||
Assert.assertEquals(6, allocator.containerRequestor.getAsk().size());
|
||||
for (ResourceRequest req : allocator.containerRequestor.getAsk()) {
|
||||
boolean isReduce =
|
||||
req.getPriority().equals(RMContainerAllocator.PRIORITY_REDUCE);
|
||||
if (isReduce) {
|
||||
|
@ -3283,8 +3275,8 @@ public class TestRMContainerAllocator {
|
|||
// indicate ramping down of reduces to scheduler.
|
||||
Assert.assertEquals(0, allocator.getScheduledRequests().reduces.size());
|
||||
Assert.assertEquals(2, allocator.getNumOfPendingReduces());
|
||||
Assert.assertEquals(3, allocator.getAsk().size());
|
||||
for (ResourceRequest req : allocator.getAsk()) {
|
||||
Assert.assertEquals(3, allocator.containerRequestor.getAsk().size());
|
||||
for (ResourceRequest req : allocator.containerRequestor.getAsk()) {
|
||||
Assert.assertEquals(
|
||||
RMContainerAllocator.PRIORITY_REDUCE, req.getPriority());
|
||||
Assert.assertTrue(req.getResourceName().equals("*") ||
|
||||
|
@ -3311,6 +3303,7 @@ public class TestRMContainerAllocator {
|
|||
RMContainerAllocator containerAllocator =
|
||||
new RMContainerAllocatorForFinishedContainer(null, context,
|
||||
mock(AMPreemptionPolicy.class));
|
||||
containerAllocator.init(new Configuration());
|
||||
|
||||
ContainerStatus finishedContainer = ContainerStatus.newInstance(
|
||||
mock(ContainerId.class), ContainerState.COMPLETE, "", 0);
|
||||
|
@ -3327,7 +3320,7 @@ public class TestRMContainerAllocator {
|
|||
extends RMContainerAllocator {
|
||||
public RMContainerAllocatorForFinishedContainer(ClientService clientService,
|
||||
AppContext context, AMPreemptionPolicy preemptionPolicy) {
|
||||
super(clientService, context, preemptionPolicy);
|
||||
super(clientService, context, preemptionPolicy, null);
|
||||
}
|
||||
@Override
|
||||
protected AssignedRequests createAssignedRequests() {
|
||||
|
@ -3427,8 +3420,8 @@ public class TestRMContainerAllocator {
|
|||
Assert.assertEquals(1, allocator.getScheduledRequests().maps.size());
|
||||
Assert.assertEquals(0, allocator.getAssignedRequests().maps.size());
|
||||
|
||||
Assert.assertEquals(6, allocator.getAsk().size());
|
||||
for (ResourceRequest req : allocator.getAsk()) {
|
||||
Assert.assertEquals(6, allocator.containerRequestor.getAsk().size());
|
||||
for (ResourceRequest req : allocator.containerRequestor.getAsk()) {
|
||||
boolean isReduce =
|
||||
req.getPriority().equals(RMContainerAllocator.PRIORITY_REDUCE);
|
||||
if (isReduce) {
|
||||
|
@ -3456,8 +3449,8 @@ public class TestRMContainerAllocator {
|
|||
// indicate ramping down of reduces to scheduler.
|
||||
Assert.assertEquals(0, allocator.getScheduledRequests().reduces.size());
|
||||
Assert.assertEquals(2, allocator.getNumOfPendingReduces());
|
||||
Assert.assertEquals(3, allocator.getAsk().size());
|
||||
for (ResourceRequest req : allocator.getAsk()) {
|
||||
Assert.assertEquals(3, allocator.containerRequestor.getAsk().size());
|
||||
for (ResourceRequest req : allocator.containerRequestor.getAsk()) {
|
||||
Assert.assertEquals(
|
||||
RMContainerAllocator.PRIORITY_REDUCE, req.getPriority());
|
||||
Assert.assertTrue(req.getResourceName().equals("*") ||
|
||||
|
|
|
@ -0,0 +1,321 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.app.rm;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerReuseRequestor.EventType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerReuseRequestor.HostInfo;
|
||||
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests for RMContainerReuseRequestor.
|
||||
*/
|
||||
public class TestRMContainerReuseRequestor {
|
||||
|
||||
private RMContainerReuseRequestor reuseRequestor;
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
RMContainerAllocator allocator = mock(RMContainerAllocator.class);
|
||||
Job job = mock(Job.class);
|
||||
Task task = mock(Task.class);
|
||||
TaskAttempt taskAttempt = mock(TaskAttempt.class);
|
||||
when(taskAttempt.getShufflePort()).thenReturn(0);
|
||||
when(task.getAttempt(any(TaskAttemptId.class))).thenReturn(taskAttempt);
|
||||
when(job.getTask(any(TaskId.class))).thenReturn(task);
|
||||
when(allocator.getJob()).thenReturn(job);
|
||||
reuseRequestor = new RMContainerReuseRequestor(null,
|
||||
allocator);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoOfTimesEachMapTaskContainerCanReuseWithDefaultConfig() {
|
||||
// Verify that no of times each map task container can be reused with
|
||||
// default configuration for
|
||||
// 'yarn.app.mapreduce.am.container.reuse.max-maptasks'.
|
||||
testNoOfTimesEachContainerCanReuseWithDefaultConfig(TaskType.MAP,
|
||||
RMContainerAllocator.PRIORITY_MAP);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoOfTimesEachMapTaskContainerCanReuseWithConfigLimit() {
|
||||
// Verify that no of times each map task container can be reused when
|
||||
// 'yarn.app.mapreduce.am.container.reuse.max-maptasks' configured with a
|
||||
// value.
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_MAPTASKS, 1);
|
||||
testNoOfTimesEachContainerCanReuseWithConfigLimit(TaskType.MAP,
|
||||
RMContainerAllocator.PRIORITY_MAP, conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoOfTimesEachRedTaskContainerCanReuseWithDefaultConfig() {
|
||||
// Verify that no of times each reduce task container can be reused with
|
||||
// default configuration for
|
||||
// 'yarn.app.mapreduce.am.container.reuse.max-reducetasks'.
|
||||
testNoOfTimesEachContainerCanReuseWithDefaultConfig(TaskType.REDUCE,
|
||||
RMContainerAllocator.PRIORITY_REDUCE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoOfTimesEachRedTaskContainerCanReuseWithConfigLimit() {
|
||||
// Verify that no of times each map task container can be reused when
|
||||
// 'yarn.app.mapreduce.am.container.reuse.max-reducetasks' configured with a
|
||||
// value.
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_REDUCETASKS, 1);
|
||||
testNoOfTimesEachContainerCanReuseWithConfigLimit(TaskType.REDUCE,
|
||||
RMContainerAllocator.PRIORITY_REDUCE, conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoOfMaxMapTaskContainersCanReuseWithDefaultConfig() {
|
||||
// Verify that no of maximum map containers can be reused at any time with
|
||||
// default configuration for
|
||||
// 'yarn.app.mapreduce.am.container.reuse.max-maptaskcontainers'.
|
||||
testNoOfMaxContainersCanReuseWithDefaultConfig(TaskType.MAP,
|
||||
RMContainerAllocator.PRIORITY_MAP);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoOfMaxMapTaskContainersCanReuseWithConfigLimit() {
|
||||
// Verify that no of maximum map containers can be reused at any time when
|
||||
// 'yarn.app.mapreduce.am.container.reuse.max-maptaskcontainers' configured
|
||||
// with a limit value.
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_MAPTASKCONTAINERS, 1);
|
||||
testNoOfMaxContainersCanReuseWithConfigLimit(TaskType.MAP,
|
||||
RMContainerAllocator.PRIORITY_MAP, conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoOfMaxRedTaskContainersCanReuseWithDefaultConfig() {
|
||||
// Verify that no of maximum reduce containers can be reused at any time
|
||||
// with default configuration for
|
||||
// 'yarn.app.mapreduce.am.container.reuse.max-reducetasks'.
|
||||
testNoOfMaxContainersCanReuseWithDefaultConfig(TaskType.REDUCE,
|
||||
RMContainerAllocator.PRIORITY_REDUCE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoOfMaxRedTaskContainersCanReuseWithConfigLimit() {
|
||||
// Verify that no of maximum reduce containers can be reused at any time
|
||||
// when 'yarn.app.mapreduce.am.container.reuse.max-reducetasks' configured
|
||||
// with a limit value.
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(MRJobConfig.MR_AM_CONTAINER_REUSE_MAX_REDUCETASKCONTAINERS, 1);
|
||||
testNoOfMaxContainersCanReuseWithConfigLimit(TaskType.REDUCE,
|
||||
RMContainerAllocator.PRIORITY_REDUCE, conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainerFailedOnHost() throws Exception {
|
||||
reuseRequestor.serviceInit(new Configuration());
|
||||
Map<Container, HostInfo> containersToReuse = reuseRequestor
|
||||
.getContainersToReuse();
|
||||
containersToReuse
|
||||
.put(newContainerInstance("container_1472171035081_0009_01_000008",
|
||||
RMContainerAllocator.PRIORITY_REDUCE), new HostInfo("node1", 1999));
|
||||
containersToReuse
|
||||
.put(newContainerInstance("container_1472171035081_0009_01_000009",
|
||||
RMContainerAllocator.PRIORITY_REDUCE), new HostInfo("node2", 1999));
|
||||
reuseRequestor.getBlacklistedNodes().add("node1");
|
||||
// It removes all containers from containersToReuse running in node1
|
||||
reuseRequestor.containerFailedOnHost("node1");
|
||||
Assert.assertFalse("node1 should not present in reuse containers.",
|
||||
containersToReuse.containsValue("node1"));
|
||||
// There will not any change to containersToReuse when there are no
|
||||
// containers to reuse in that node
|
||||
reuseRequestor.containerFailedOnHost("node3");
|
||||
Assert.assertEquals(1, containersToReuse.size());
|
||||
}
|
||||
|
||||
private void testNoOfTimesEachContainerCanReuseWithDefaultConfig(
|
||||
TaskType taskType, Priority priority) {
|
||||
// Verify that no of times each container can be reused
|
||||
|
||||
// Add 10 container reqs to the requestor
|
||||
addContainerReqs(priority);
|
||||
Container container = newContainerInstance(
|
||||
"container_123456789_0001_01_000002", priority);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
JobId jobId = MRBuilderUtils.newJobId(123456789, 1, 1);
|
||||
TaskId taskId = MRBuilderUtils.newTaskId(jobId, i + 1, taskType);
|
||||
TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 1);
|
||||
ContainerAvailableEvent event = new ContainerAvailableEvent(
|
||||
EventType.CONTAINER_AVAILABLE, taskAttemptId, container);
|
||||
reuseRequestor.handle(event);
|
||||
Map<Container, HostInfo> containersToReuse = reuseRequestor
|
||||
.getContainersToReuse();
|
||||
Assert.assertTrue("Container should be added for reuse.",
|
||||
containersToReuse.containsKey(container));
|
||||
}
|
||||
}
|
||||
|
||||
private void testNoOfTimesEachContainerCanReuseWithConfigLimit(
|
||||
TaskType taskType, Priority priority, Configuration conf) {
|
||||
reuseRequestor.init(conf);
|
||||
// Add a container request
|
||||
ContainerRequest req1 = new ContainerRequest(null,
|
||||
Resource.newInstance(2048, 1), new String[0], new String[0], priority,
|
||||
null);
|
||||
reuseRequestor.addContainerReq(req1);
|
||||
// Add an another container request
|
||||
ContainerRequest req2 = new ContainerRequest(null,
|
||||
Resource.newInstance(2048, 1), new String[0], new String[0], priority,
|
||||
null);
|
||||
reuseRequestor.addContainerReq(req2);
|
||||
|
||||
EventType eventType = EventType.CONTAINER_AVAILABLE;
|
||||
Container container = newContainerInstance(
|
||||
"container_123456789_0001_01_000002", priority);
|
||||
JobId jobId = MRBuilderUtils.newJobId(123456789, 1, 1);
|
||||
TaskId taskId1 = MRBuilderUtils.newTaskId(jobId, 1, taskType);
|
||||
TaskAttemptId taskAttemptId1 = MRBuilderUtils.newTaskAttemptId(taskId1, 1);
|
||||
|
||||
TaskId taskId2 = MRBuilderUtils.newTaskId(jobId, 2, taskType);
|
||||
TaskAttemptId taskAttemptId2 = MRBuilderUtils.newTaskAttemptId(taskId2, 1);
|
||||
|
||||
ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType,
|
||||
taskAttemptId1, container);
|
||||
reuseRequestor.handle(event1);
|
||||
Map<Container, HostInfo> containersToReuse = reuseRequestor
|
||||
.getContainersToReuse();
|
||||
// It is reusing the container
|
||||
Assert.assertTrue("Container should be added for reuse.",
|
||||
containersToReuse.containsKey(container));
|
||||
containersToReuse.clear();
|
||||
ContainerAvailableEvent event2 = new ContainerAvailableEvent(eventType,
|
||||
taskAttemptId2, container);
|
||||
reuseRequestor.handle(event2);
|
||||
// It should not be reused since it has already reused and limit value is 1.
|
||||
Assert.assertFalse("Container should not be added for reuse.",
|
||||
containersToReuse.containsKey(container));
|
||||
}
|
||||
|
||||
private void testNoOfMaxContainersCanReuseWithDefaultConfig(TaskType taskType,
|
||||
Priority priority) {
|
||||
// It tests no of times each container can be reused
|
||||
|
||||
// Add 10 container reqs to the requestor
|
||||
addContainerReqs(priority);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Container container = newContainerInstance(
|
||||
"container_123456789_0001_01_00000" + (i + 2), priority);
|
||||
JobId jobId = MRBuilderUtils.newJobId(123456789, 1, 1);
|
||||
TaskId taskId1 = MRBuilderUtils.newTaskId(jobId, i + 1, taskType);
|
||||
TaskAttemptId taskAttemptId1 = MRBuilderUtils.newTaskAttemptId(taskId1,
|
||||
1);
|
||||
ContainerAvailableEvent event1 = new ContainerAvailableEvent(
|
||||
EventType.CONTAINER_AVAILABLE, taskAttemptId1, container);
|
||||
reuseRequestor.handle(event1);
|
||||
Map<Container, HostInfo> containersToReuse = reuseRequestor
|
||||
.getContainersToReuse();
|
||||
Assert.assertTrue("Container should be added for reuse.",
|
||||
containersToReuse.containsKey(container));
|
||||
}
|
||||
}
|
||||
|
||||
private void testNoOfMaxContainersCanReuseWithConfigLimit(TaskType taskType,
|
||||
Priority priority, Configuration conf) {
|
||||
reuseRequestor.init(conf);
|
||||
ContainerRequest req1 = new ContainerRequest(null,
|
||||
Resource.newInstance(2048, 1), new String[0], new String[0], priority,
|
||||
null);
|
||||
reuseRequestor.addContainerReq(req1);
|
||||
|
||||
ContainerRequest req2 = new ContainerRequest(null,
|
||||
Resource.newInstance(2048, 1), new String[0], new String[0], priority,
|
||||
null);
|
||||
reuseRequestor.addContainerReq(req2);
|
||||
|
||||
EventType eventType = EventType.CONTAINER_AVAILABLE;
|
||||
Container container1 = newContainerInstance(
|
||||
"container_123456789_0001_01_000002", priority);
|
||||
JobId jobId = MRBuilderUtils.newJobId(123456789, 1, 1);
|
||||
TaskId taskId1 = MRBuilderUtils.newTaskId(jobId, 1, taskType);
|
||||
TaskAttemptId taskAttemptId1 = MRBuilderUtils.newTaskAttemptId(taskId1, 1);
|
||||
|
||||
TaskId taskId2 = MRBuilderUtils.newTaskId(jobId, 2, taskType);
|
||||
TaskAttemptId taskAttemptId2 = MRBuilderUtils.newTaskAttemptId(taskId2, 1);
|
||||
|
||||
ContainerAvailableEvent event1 = new ContainerAvailableEvent(eventType,
|
||||
taskAttemptId1, container1);
|
||||
reuseRequestor.handle(event1);
|
||||
Map<Container, HostInfo> containersToReuse = reuseRequestor
|
||||
.getContainersToReuse();
|
||||
Assert.assertTrue("Container should be added for reuse.",
|
||||
containersToReuse.containsKey(container1));
|
||||
containersToReuse.clear();
|
||||
Container container2 = newContainerInstance(
|
||||
"container_123456789_0001_01_000003", priority);
|
||||
ContainerAvailableEvent event2 = new ContainerAvailableEvent(eventType,
|
||||
taskAttemptId2, container2);
|
||||
reuseRequestor.handle(event2);
|
||||
Assert.assertFalse("Container should not be added for reuse.",
|
||||
containersToReuse.containsKey(container2));
|
||||
}
|
||||
|
||||
private void addContainerReqs(Priority priority) {
|
||||
Configuration conf = new Configuration();
|
||||
reuseRequestor.init(conf);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
ContainerRequest req = new ContainerRequest(null,
|
||||
Resource.newInstance(2048, 1), new String[0], new String[0], priority,
|
||||
null);
|
||||
reuseRequestor.addContainerReq(req);
|
||||
}
|
||||
}
|
||||
|
||||
private Container newContainerInstance(String containerId,
|
||||
Priority priority) {
|
||||
return Container.newInstance(ContainerId.fromString(containerId),
|
||||
NodeId.newInstance("node1", 8080), "", null, priority, null);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
reuseRequestor.stop();
|
||||
}
|
||||
}
|
|
@ -1295,4 +1295,44 @@ public interface MRJobConfig {
|
|||
*/
|
||||
@Unstable
|
||||
String INPUT_FILE_MANDATORY_PREFIX = "mapreduce.job.input.file.must.";
|
||||
|
||||
/**
|
||||
* Whether to enable the RM Container reuse or not.
|
||||
*/
|
||||
String MR_AM_CONTAINER_REUSE_ENABLED = MR_PREFIX + "container.reuse.enabled";
|
||||
boolean DEFAULT_MR_AM_CONTAINER_REUSE_ENABLED = false;
|
||||
|
||||
/**
|
||||
* No of times can each container(Map Task) reuse. The Default value is -1,
|
||||
* which implies there is no limit for reusing the container for map tasks.
|
||||
*/
|
||||
String MR_AM_CONTAINER_REUSE_MAX_MAPTASKS = MR_PREFIX
|
||||
+ "container.reuse.max-maptasks";
|
||||
int DEFAULT_MR_AM_CONTAINER_REUSE_MAX_MAPTASKS = -1;
|
||||
|
||||
/**
|
||||
* No of tasks can each container(Reduce Task) reuse. The Default value is -1,
|
||||
* which implies there is no limit for reusing the container for reduce tasks.
|
||||
*/
|
||||
String MR_AM_CONTAINER_REUSE_MAX_REDUCETASKS = MR_PREFIX
|
||||
+ "container.reuse.max-reducetasks";
|
||||
int DEFAULT_MR_AM_CONTAINER_REUSE_MAX_REDUCETASKS = -1;
|
||||
|
||||
/**
|
||||
* No of Max Map task containers can be reused at any point of time. The
|
||||
* Default value is -1, which implies there is no limit of having map task
|
||||
* containers for reuse at any time.
|
||||
*/
|
||||
String MR_AM_CONTAINER_REUSE_MAX_MAPTASKCONTAINERS = MR_PREFIX
|
||||
+ "container.reuse.max-maptaskcontainers";
|
||||
int DEFAULT_MR_AM_CONTAINER_REUSE_MAX_MAPTASKCONTAINERS = -1;
|
||||
|
||||
/**
|
||||
* No Of Max Reduce Task Containers can be reused at any point of time. The
|
||||
* Default value is -1, which implies there is no limit of having reduce task
|
||||
* containers for reuse at any time.
|
||||
*/
|
||||
String MR_AM_CONTAINER_REUSE_MAX_REDUCETASKCONTAINERS = MR_PREFIX
|
||||
+ "container.reuse.max-reducetaskcontainers";
|
||||
int DEFAULT_MR_AM_CONTAINER_REUSE_MAX_REDUCETASKCONTAINERS = -1;
|
||||
}
|
||||
|
|
|
@ -2231,4 +2231,49 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
Whether to enable the RM Container reuse or not.
|
||||
</description>
|
||||
<name>yarn.app.mapreduce.container.reuse.enabled</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
No of times can each container(Map Task) reuse. The Default value is -1,
|
||||
which implies there is no limit for reusing the container for map tasks.
|
||||
</description>
|
||||
<name>yarn.app.mapreduce.container.reuse.max-maptasks</name>
|
||||
<value>-1</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
No of tasks can each container(Reduce Task) reuse. The Default value is -1,
|
||||
which implies there is no limit for reusing the container for reduce tasks.
|
||||
</description>
|
||||
<name>yarn.app.mapreduce.container.reuse.max-reducetasks</name>
|
||||
<value>-1</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
No of Max Map task containers can be reused at any point of time. The
|
||||
Default value is -1, which implies there is no limit of having map task
|
||||
containers for reuse at any time.
|
||||
</description>
|
||||
<name>yarn.app.mapreduce.container.reuse.max-maptaskcontainers</name>
|
||||
<value>-1</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
No Of Max Reduce Task Containers can be reused at any point of time. The
|
||||
Default value is -1, which implies there is no limit of having reduce task
|
||||
containers for reuse at any time.
|
||||
</description>
|
||||
<name>yarn.app.mapreduce.container.reuse.max-reducetaskcontainers</name>
|
||||
<value>-1</value>
|
||||
</property>
|
||||
</configuration>
|
||||
|
|
Loading…
Reference in New Issue