From 08f8abf5639d39167952dc5120b44fe35c63ff7a Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Thu, 5 Jan 2012 01:37:13 +0000 Subject: [PATCH] MAPREDUCE-3572. Moved AM event dispatcher to a separate thread for performance reasons. Contributed by Vinod K V. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1227426 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapreduce/v2/app/rm/RMCommunicator.java | 3 +- .../v2/app/rm/RMContainerAllocator.java | 84 +++++++++++++++---- .../v2/app/TestRMContainerAllocator.java | 4 +- 4 files changed, 75 insertions(+), 19 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 3439cf3f412..b9fe77109ae 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -406,6 +406,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3566. Fixed MR AM to construct CLC only once across all tasks. (vinodkv via acmurthy) + MAPREDUCE-3572. Moved AM event dispatcher to a separate thread for + performance reasons. (vinodkv via acmurthy) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 5276276c4e8..b138e9a6619 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -104,10 +104,9 @@ public abstract class RMCommunicator extends AbstractService { @Override public void start() { scheduler= createSchedulerProxy(); - //LOG.info("Scheduler is " + scheduler); register(); startAllocatorThread(); - JobID id = TypeConverter.fromYarn(context.getApplicationID()); + JobID id = TypeConverter.fromYarn(this.applicationId); JobId jobId = TypeConverter.toYarn(id); job = context.getJob(jobId); super.start(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 74e2c1b2a8f..4adbfc6d8d3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -30,18 +30,17 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobCounter; -import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent; -import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; @@ -69,7 +68,7 @@ import org.apache.hadoop.yarn.util.RackResolver; public class RMContainerAllocator extends RMContainerRequestor implements ContainerAllocator { - private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class); + static final Log LOG = LogFactory.getLog(RMContainerAllocator.class); public static final float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f; @@ -77,7 +76,10 @@ public class RMContainerAllocator extends RMContainerRequestor private static final Priority PRIORITY_FAST_FAIL_MAP; private static final Priority PRIORITY_REDUCE; private static final Priority PRIORITY_MAP; - + + private Thread eventHandlingThread; + private volatile boolean stopEventHandling; + static { PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class); PRIORITY_FAST_FAIL_MAP.setPriority(5); @@ -130,7 +132,10 @@ public class RMContainerAllocator extends RMContainerRequestor private float reduceSlowStart = 0; private long retryInterval; private long retrystartTime; - + + BlockingQueue eventQueue + = new LinkedBlockingQueue(); + public RMContainerAllocator(ClientService clientService, AppContext context) { super(clientService, context); } @@ -155,6 +160,40 @@ public class RMContainerAllocator extends RMContainerRequestor retrystartTime = System.currentTimeMillis(); } + @Override + public void start() { + this.eventHandlingThread = new Thread() { + @SuppressWarnings("unchecked") + @Override + public void run() { + + ContainerAllocatorEvent event; + + while (!stopEventHandling && !Thread.currentThread().isInterrupted()) { + try { + event = RMContainerAllocator.this.eventQueue.take(); + } catch (InterruptedException e) { + LOG.error("Returning, interrupted : " + e); + return; + } + + try { + handleEvent(event); + } catch (Throwable t) { + LOG.error("Error in handling event type " + event.getType() + + " to the ContainreAllocator", t); + // Kill the AM + eventHandler.handle(new JobEvent(getJob().getID(), + JobEventType.INTERNAL_ERROR)); + return; + } + } + } + }; + this.eventHandlingThread.start(); + super.start(); + } + @Override protected synchronized void heartbeat() throws Exception { LOG.info("Before Scheduling: " + getStat()); @@ -181,6 +220,8 @@ public class RMContainerAllocator extends RMContainerRequestor @Override public void stop() { + this.stopEventHandling = true; + eventHandlingThread.interrupt(); super.stop(); LOG.info("Final Stats: " + getStat()); } @@ -192,10 +233,27 @@ public class RMContainerAllocator extends RMContainerRequestor public void setIsReduceStarted(boolean reduceStarted) { this.reduceStarted = reduceStarted; } - - @SuppressWarnings("unchecked") + @Override - public synchronized void handle(ContainerAllocatorEvent event) { + public void handle(ContainerAllocatorEvent event) { + int qSize = eventQueue.size(); + if (qSize != 0 && qSize % 1000 == 0) { + LOG.info("Size of event-queue in RMContainerAllocator is " + qSize); + } + int remCapacity = eventQueue.remainingCapacity(); + if (remCapacity < 1000) { + LOG.warn("Very low remaining capacity in the event-queue " + + "of RMContainerAllocator: " + remCapacity); + } + try { + eventQueue.put(event); + } catch (InterruptedException e) { + throw new YarnException(e); + } + } + + @SuppressWarnings({ "unchecked" }) + protected synchronized void handleEvent(ContainerAllocatorEvent event) { LOG.info("Processing the event " + event.toString()); recalculateReduceSchedule = true; if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) { @@ -206,9 +264,7 @@ public class RMContainerAllocator extends RMContainerRequestor int minSlotMemSize = getMinContainerCapability().getMemory(); mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize) * minSlotMemSize; - JobID id = TypeConverter.fromYarn(applicationId); - JobId jobId = TypeConverter.toYarn(id); - eventHandler.handle(new JobHistoryEvent(jobId, + eventHandler.handle(new JobHistoryEvent(getJob().getID(), new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceReqt))); LOG.info("mapResourceReqt:"+mapResourceReqt); @@ -232,9 +288,7 @@ public class RMContainerAllocator extends RMContainerRequestor //round off on slotsize reduceResourceReqt = (int) Math.ceil((float) reduceResourceReqt/minSlotMemSize) * minSlotMemSize; - JobID id = TypeConverter.fromYarn(applicationId); - JobId jobId = TypeConverter.toYarn(id); - eventHandler.handle(new JobHistoryEvent(jobId, + eventHandler.handle(new JobHistoryEvent(getJob().getID(), new NormalizedResourceEvent( org.apache.hadoop.mapreduce.TaskType.REDUCE, reduceResourceReqt))); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index a4b84b2b53d..db1f20fa49a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -1186,12 +1186,12 @@ public class TestRMContainerAllocator { public void sendRequests(List reqs) { for (ContainerRequestEvent req : reqs) { - super.handle(req); + super.handleEvent(req); } } public void sendFailure(ContainerFailedEvent f) { - super.handle(f); + super.handleEvent(f); } // API to be used by tests