Merge -c 1227426 from trunk to branch-0.23 to fix MAPREDUCE-3572. Moved AM event dispatcher to a separate thread for performance reasons.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1227428 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2012-01-05 01:38:49 +00:00
parent 2122954a0d
commit fa86b57c30
4 changed files with 75 additions and 19 deletions

View File

@ -349,6 +349,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3566. Fixed MR AM to construct CLC only once across all tasks.
(vinodkv via acmurthy)
MAPREDUCE-3572. Moved AM event dispatcher to a separate thread for
performance reasons. (vinodkv via acmurthy)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -104,10 +104,9 @@ public abstract class RMCommunicator extends AbstractService {
@Override
public void start() {
scheduler= createSchedulerProxy();
//LOG.info("Scheduler is " + scheduler);
register();
startAllocatorThread();
JobID id = TypeConverter.fromYarn(context.getApplicationID());
JobID id = TypeConverter.fromYarn(this.applicationId);
JobId jobId = TypeConverter.toYarn(id);
job = context.getJob(jobId);
super.start();

View File

@ -30,18 +30,17 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
@ -69,7 +68,7 @@ import org.apache.hadoop.yarn.util.RackResolver;
public class RMContainerAllocator extends RMContainerRequestor
implements ContainerAllocator {
private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
public static final
float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
@ -78,6 +77,9 @@ public class RMContainerAllocator extends RMContainerRequestor
private static final Priority PRIORITY_REDUCE;
private static final Priority PRIORITY_MAP;
private Thread eventHandlingThread;
private volatile boolean stopEventHandling;
static {
PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
PRIORITY_FAST_FAIL_MAP.setPriority(5);
@ -131,6 +133,9 @@ public class RMContainerAllocator extends RMContainerRequestor
private long retryInterval;
private long retrystartTime;
BlockingQueue<ContainerAllocatorEvent> eventQueue
= new LinkedBlockingQueue<ContainerAllocatorEvent>();
public RMContainerAllocator(ClientService clientService, AppContext context) {
super(clientService, context);
}
@ -155,6 +160,40 @@ public class RMContainerAllocator extends RMContainerRequestor
retrystartTime = System.currentTimeMillis();
}
@Override
public void start() {
this.eventHandlingThread = new Thread() {
@SuppressWarnings("unchecked")
@Override
public void run() {
ContainerAllocatorEvent event;
while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
try {
event = RMContainerAllocator.this.eventQueue.take();
} catch (InterruptedException e) {
LOG.error("Returning, interrupted : " + e);
return;
}
try {
handleEvent(event);
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
+ " to the ContainreAllocator", t);
// Kill the AM
eventHandler.handle(new JobEvent(getJob().getID(),
JobEventType.INTERNAL_ERROR));
return;
}
}
}
};
this.eventHandlingThread.start();
super.start();
}
@Override
protected synchronized void heartbeat() throws Exception {
LOG.info("Before Scheduling: " + getStat());
@ -181,6 +220,8 @@ public class RMContainerAllocator extends RMContainerRequestor
@Override
public void stop() {
this.stopEventHandling = true;
eventHandlingThread.interrupt();
super.stop();
LOG.info("Final Stats: " + getStat());
}
@ -193,9 +234,26 @@ public class RMContainerAllocator extends RMContainerRequestor
this.reduceStarted = reduceStarted;
}
@SuppressWarnings("unchecked")
@Override
public synchronized void handle(ContainerAllocatorEvent event) {
public void handle(ContainerAllocatorEvent event) {
int qSize = eventQueue.size();
if (qSize != 0 && qSize % 1000 == 0) {
LOG.info("Size of event-queue in RMContainerAllocator is " + qSize);
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue "
+ "of RMContainerAllocator: " + remCapacity);
}
try {
eventQueue.put(event);
} catch (InterruptedException e) {
throw new YarnException(e);
}
}
@SuppressWarnings({ "unchecked" })
protected synchronized void handleEvent(ContainerAllocatorEvent event) {
LOG.info("Processing the event " + event.toString());
recalculateReduceSchedule = true;
if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
@ -206,9 +264,7 @@ public class RMContainerAllocator extends RMContainerRequestor
int minSlotMemSize = getMinContainerCapability().getMemory();
mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize)
* minSlotMemSize;
JobID id = TypeConverter.fromYarn(applicationId);
JobId jobId = TypeConverter.toYarn(id);
eventHandler.handle(new JobHistoryEvent(jobId,
eventHandler.handle(new JobHistoryEvent(getJob().getID(),
new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
mapResourceReqt)));
LOG.info("mapResourceReqt:"+mapResourceReqt);
@ -232,9 +288,7 @@ public class RMContainerAllocator extends RMContainerRequestor
//round off on slotsize
reduceResourceReqt = (int) Math.ceil((float)
reduceResourceReqt/minSlotMemSize) * minSlotMemSize;
JobID id = TypeConverter.fromYarn(applicationId);
JobId jobId = TypeConverter.toYarn(id);
eventHandler.handle(new JobHistoryEvent(jobId,
eventHandler.handle(new JobHistoryEvent(getJob().getID(),
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.REDUCE,
reduceResourceReqt)));

View File

@ -1186,12 +1186,12 @@ public class TestRMContainerAllocator {
public void sendRequests(List<ContainerRequestEvent> reqs) {
for (ContainerRequestEvent req : reqs) {
super.handle(req);
super.handleEvent(req);
}
}
public void sendFailure(ContainerFailedEvent f) {
super.handle(f);
super.handleEvent(f);
}
// API to be used by tests