merge -r 1409031:1409032 from trunk. FIXES: MAPREDUCE-4517

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1409034 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2012-11-13 23:49:04 +00:00
parent 7068073240
commit 6a743f3f5b
2 changed files with 72 additions and 24 deletions

View File

@ -514,6 +514,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4786. Job End Notification retry interval is 5 milliseconds by MAPREDUCE-4786. Job End Notification retry interval is 5 milliseconds by
default (Ravi Prakash via bobby) default (Ravi Prakash via bobby)
MAPREDUCE-4517. Too many INFO messages written out during AM to RM heartbeat
(Jason Lowe via tgraves)
Release 0.23.4 - UNRELEASED Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -67,6 +67,7 @@
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.RackResolver;
@ -145,6 +146,8 @@ added to the pending and are ramped up (added to scheduled) based
BlockingQueue<ContainerAllocatorEvent> eventQueue BlockingQueue<ContainerAllocatorEvent> eventQueue
= new LinkedBlockingQueue<ContainerAllocatorEvent>(); = new LinkedBlockingQueue<ContainerAllocatorEvent>();
private ScheduleStats scheduleStats = new ScheduleStats();
public RMContainerAllocator(ClientService clientService, AppContext context) { public RMContainerAllocator(ClientService clientService, AppContext context) {
super(clientService, context); super(clientService, context);
this.stopped = new AtomicBoolean(false); this.stopped = new AtomicBoolean(false);
@ -208,13 +211,10 @@ public void run() {
@Override @Override
protected synchronized void heartbeat() throws Exception { protected synchronized void heartbeat() throws Exception {
LOG.info("Before Scheduling: " + getStat()); scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
List<Container> allocatedContainers = getResources(); List<Container> allocatedContainers = getResources();
LOG.info("After Scheduling: " + getStat());
if (allocatedContainers.size() > 0) { if (allocatedContainers.size() > 0) {
LOG.info("Before Assign: " + getStat());
scheduledRequests.assign(allocatedContainers); scheduledRequests.assign(allocatedContainers);
LOG.info("After Assign: " + getStat());
} }
int completedMaps = getJob().getCompletedMaps(); int completedMaps = getJob().getCompletedMaps();
@ -235,6 +235,8 @@ protected synchronized void heartbeat() throws Exception {
maxReduceRampupLimit, reduceSlowStart); maxReduceRampupLimit, reduceSlowStart);
recalculateReduceSchedule = false; recalculateReduceSchedule = false;
} }
scheduleStats.updateAndLogIfChanged("After Scheduling: ");
} }
@Override @Override
@ -245,7 +247,7 @@ public void stop() {
} }
eventHandlingThread.interrupt(); eventHandlingThread.interrupt();
super.stop(); super.stop();
LOG.info("Final Stats: " + getStat()); scheduleStats.log("Final Stats: ");
} }
public boolean getIsReduceStarted() { public boolean getIsReduceStarted() {
@ -427,7 +429,9 @@ public void scheduleReduces(
return; return;
} }
LOG.info("Recalculating schedule..."); int headRoom = getAvailableResources() != null ?
getAvailableResources().getMemory() : 0;
LOG.info("Recalculating schedule, headroom=" + headRoom);
//check for slow start //check for slow start
if (!getIsReduceStarted()) {//not set yet if (!getIsReduceStarted()) {//not set yet
@ -536,24 +540,6 @@ public void rampDownReduces(int rampDown) {
} }
} }
/**
* Synchronized to avoid findbugs warnings
*/
private synchronized String getStat() {
return "PendingReduces:" + pendingReduces.size() +
" ScheduledMaps:" + scheduledRequests.maps.size() +
" ScheduledReduces:" + scheduledRequests.reduces.size() +
" AssignedMaps:" + assignedRequests.maps.size() +
" AssignedReduces:" + assignedRequests.reduces.size() +
" completedMaps:" + getJob().getCompletedMaps() +
" completedReduces:" + getJob().getCompletedReduces() +
" containersAllocated:" + containersAllocated +
" containersReleased:" + containersReleased +
" hostLocalAssigned:" + hostLocalAssigned +
" rackLocalAssigned:" + rackLocalAssigned +
" availableResources(headroom):" + getAvailableResources();
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private List<Container> getResources() throws Exception { private List<Container> getResources() throws Exception {
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null
@ -595,6 +581,9 @@ private List<Container> getResources() throws Exception {
if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom) { if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom) {
//something changed //something changed
recalculateReduceSchedule = true; recalculateReduceSchedule = true;
if (LOG.isDebugEnabled() && headRoom != newHeadRoom) {
LOG.debug("headroom=" + newHeadRoom);
}
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -1123,4 +1112,60 @@ ContainerId get(TaskAttemptId tId) {
} }
} }
} }
private class ScheduleStats {
int numPendingReduces;
int numScheduledMaps;
int numScheduledReduces;
int numAssignedMaps;
int numAssignedReduces;
int numCompletedMaps;
int numCompletedReduces;
int numContainersAllocated;
int numContainersReleased;
public void updateAndLogIfChanged(String msgPrefix) {
boolean changed = false;
// synchronized to fix findbug warnings
synchronized (RMContainerAllocator.this) {
changed |= (numPendingReduces != pendingReduces.size());
numPendingReduces = pendingReduces.size();
changed |= (numScheduledMaps != scheduledRequests.maps.size());
numScheduledMaps = scheduledRequests.maps.size();
changed |= (numScheduledReduces != scheduledRequests.reduces.size());
numScheduledReduces = scheduledRequests.reduces.size();
changed |= (numAssignedMaps != assignedRequests.maps.size());
numAssignedMaps = assignedRequests.maps.size();
changed |= (numAssignedReduces != assignedRequests.reduces.size());
numAssignedReduces = assignedRequests.reduces.size();
changed |= (numCompletedMaps != getJob().getCompletedMaps());
numCompletedMaps = getJob().getCompletedMaps();
changed |= (numCompletedReduces != getJob().getCompletedReduces());
numCompletedReduces = getJob().getCompletedReduces();
changed |= (numContainersAllocated != containersAllocated);
numContainersAllocated = containersAllocated;
changed |= (numContainersReleased != containersReleased);
numContainersReleased = containersReleased;
}
if (changed) {
log(msgPrefix);
}
}
public void log(String msgPrefix) {
LOG.info(msgPrefix + "PendingReds:" + numPendingReduces +
" ScheduledMaps:" + numScheduledMaps +
" ScheduledReds:" + numScheduledReduces +
" AssignedMaps:" + numAssignedMaps +
" AssignedReds:" + numAssignedReduces +
" CompletedMaps:" + numCompletedMaps +
" CompletedReds:" + numCompletedReduces +
" ContAlloc:" + numContainersAllocated +
" ContRel:" + numContainersReleased +
" HostLocal:" + hostLocalAssigned +
" RackLocal:" + rackLocalAssigned);
}
}
} }