From 6a743f3f5b8166446f33bc9c75e95c8a9c0edc82 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Tue, 13 Nov 2012 23:49:04 +0000 Subject: [PATCH] merge -r 1409031:1409032 from trunk. FIXES: MAPREDUCE-4517 git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1409034 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/rm/RMContainerAllocator.java | 93 ++++++++++++++----- 2 files changed, 72 insertions(+), 24 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 1c346ffdaba..05b0f780b5f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -513,6 +513,9 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4786. Job End Notification retry interval is 5 milliseconds by default (Ravi Prakash via bobby) + + MAPREDUCE-4517. Too many INFO messages written out during AM to RM heartbeat + (Jason Lowe via tgraves) Release 0.23.4 - UNRELEASED diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 1e34365b5dc..96db3f13850 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.RackResolver; @@ -145,6 +146,8 @@ added to the pending and are ramped up (added to scheduled) based BlockingQueue eventQueue = new LinkedBlockingQueue(); + private ScheduleStats scheduleStats = new ScheduleStats(); + public RMContainerAllocator(ClientService clientService, AppContext context) { super(clientService, context); this.stopped = new AtomicBoolean(false); @@ -208,13 +211,10 @@ public void run() { @Override protected synchronized void heartbeat() throws Exception { - LOG.info("Before Scheduling: " + getStat()); + scheduleStats.updateAndLogIfChanged("Before Scheduling: "); List allocatedContainers = getResources(); - LOG.info("After Scheduling: " + getStat()); if (allocatedContainers.size() > 0) { - LOG.info("Before Assign: " + getStat()); scheduledRequests.assign(allocatedContainers); - LOG.info("After Assign: " + getStat()); } int completedMaps = getJob().getCompletedMaps(); @@ -235,6 +235,8 @@ protected synchronized void heartbeat() throws Exception { maxReduceRampupLimit, reduceSlowStart); recalculateReduceSchedule = false; } + + scheduleStats.updateAndLogIfChanged("After Scheduling: "); } @Override @@ -245,7 +247,7 @@ public void stop() { } eventHandlingThread.interrupt(); super.stop(); - LOG.info("Final Stats: " + getStat()); + scheduleStats.log("Final Stats: "); } public boolean getIsReduceStarted() { @@ -427,7 +429,9 @@ public void scheduleReduces( return; } - LOG.info("Recalculating schedule..."); + int headRoom = getAvailableResources() != null ? + getAvailableResources().getMemory() : 0; + LOG.info("Recalculating schedule, headroom=" + headRoom); //check for slow start if (!getIsReduceStarted()) {//not set yet @@ -536,24 +540,6 @@ public void rampDownReduces(int rampDown) { } } - /** - * Synchronized to avoid findbugs warnings - */ - private synchronized String getStat() { - return "PendingReduces:" + pendingReduces.size() + - " ScheduledMaps:" + scheduledRequests.maps.size() + - " ScheduledReduces:" + scheduledRequests.reduces.size() + - " AssignedMaps:" + assignedRequests.maps.size() + - " AssignedReduces:" + assignedRequests.reduces.size() + - " completedMaps:" + getJob().getCompletedMaps() + - " completedReduces:" + getJob().getCompletedReduces() + - " containersAllocated:" + containersAllocated + - " containersReleased:" + containersReleased + - " hostLocalAssigned:" + hostLocalAssigned + - " rackLocalAssigned:" + rackLocalAssigned + - " availableResources(headroom):" + getAvailableResources(); - } - @SuppressWarnings("unchecked") private List getResources() throws Exception { int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null @@ -595,6 +581,9 @@ private List getResources() throws Exception { if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom) { //something changed recalculateReduceSchedule = true; + if (LOG.isDebugEnabled() && headRoom != newHeadRoom) { + LOG.debug("headroom=" + newHeadRoom); + } } if (LOG.isDebugEnabled()) { @@ -1123,4 +1112,60 @@ ContainerId get(TaskAttemptId tId) { } } } + + private class ScheduleStats { + int numPendingReduces; + int numScheduledMaps; + int numScheduledReduces; + int numAssignedMaps; + int numAssignedReduces; + int numCompletedMaps; + int numCompletedReduces; + int numContainersAllocated; + int numContainersReleased; + + public void updateAndLogIfChanged(String msgPrefix) { + boolean changed = false; + + // synchronized to fix findbug warnings + synchronized (RMContainerAllocator.this) { + changed |= (numPendingReduces != pendingReduces.size()); + numPendingReduces = pendingReduces.size(); + changed |= (numScheduledMaps != scheduledRequests.maps.size()); + numScheduledMaps = scheduledRequests.maps.size(); + changed |= (numScheduledReduces != scheduledRequests.reduces.size()); + numScheduledReduces = scheduledRequests.reduces.size(); + changed |= (numAssignedMaps != assignedRequests.maps.size()); + numAssignedMaps = assignedRequests.maps.size(); + changed |= (numAssignedReduces != assignedRequests.reduces.size()); + numAssignedReduces = assignedRequests.reduces.size(); + changed |= (numCompletedMaps != getJob().getCompletedMaps()); + numCompletedMaps = getJob().getCompletedMaps(); + changed |= (numCompletedReduces != getJob().getCompletedReduces()); + numCompletedReduces = getJob().getCompletedReduces(); + changed |= (numContainersAllocated != containersAllocated); + numContainersAllocated = containersAllocated; + changed |= (numContainersReleased != containersReleased); + numContainersReleased = containersReleased; + } + + if (changed) { + log(msgPrefix); + } + } + + public void log(String msgPrefix) { + LOG.info(msgPrefix + "PendingReds:" + numPendingReduces + + " ScheduledMaps:" + numScheduledMaps + + " ScheduledReds:" + numScheduledReduces + + " AssignedMaps:" + numAssignedMaps + + " AssignedReds:" + numAssignedReduces + + " CompletedMaps:" + numCompletedMaps + + " CompletedReds:" + numCompletedReduces + + " ContAlloc:" + numContainersAllocated + + " ContRel:" + numContainersReleased + + " HostLocal:" + hostLocalAssigned + + " RackLocal:" + rackLocalAssigned); + } + } }