From caea6d2e855b8170667de7fb49acdcc2bf33bdb2 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Mon, 6 Feb 2012 22:03:30 +0000 Subject: [PATCH] Merge -c 1241205 from trunk to branch-0.23 to fix MAPREDUCE-3810. Performance tweaks - reduced logging in AM and defined hascode/equals for ResourceRequest & Priority. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1241208 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/job/impl/TaskAttemptImpl.java | 18 ++-- .../mapreduce/v2/app/job/impl/TaskImpl.java | 11 ++- .../mapreduce/v2/app/rm/RMCommunicator.java | 15 +-- .../v2/app/rm/RMContainerAllocator.java | 97 +++++++++++------- .../v2/app/rm/RMContainerRequestor.java | 62 +++++++----- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 2 +- .../mapreduce/v2/app/MRAppBenchmark.java | 98 ++++++++++++++++++- .../hadoop/yarn/api/records/Priority.java | 29 +++++- .../yarn/api/records/ResourceRequest.java | 77 ++++++++++++++- .../api/records/impl/pb/PriorityPBImpl.java | 13 +-- .../impl/pb/ResourceRequestPBImpl.java | 28 +----- 12 files changed, 329 insertions(+), 124 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 9a697c2a38b..8e3d1019eb7 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -665,6 +665,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3814. Fixed MRV1 compilation. (Arun C Murthy via vinodkv) + MAPREDUCE-3810. Performance tweaks - reduced logging in AM and defined + hascode/equals for ResourceRequest & Priority. (vinodkv via acmurthy) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 7cca98031d5..4f504b381a4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -127,7 +127,7 @@ /** * Implementation of TaskAttempt interface. */ -@SuppressWarnings({ "rawtypes", "deprecation" }) +@SuppressWarnings({ "rawtypes" }) public abstract class TaskAttemptImpl implements org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt, EventHandler { @@ -910,8 +910,10 @@ public TaskAttemptState getState() { @SuppressWarnings("unchecked") @Override public void handle(TaskAttemptEvent event) { - LOG.info("Processing " + event.getTaskAttemptID() + - " of type " + event.getType()); + if (LOG.isDebugEnabled()) { + LOG.debug("Processing " + event.getTaskAttemptID() + " of type " + + event.getType()); + } writeLock.lock(); try { final TaskAttemptState oldState = getState(); @@ -1278,15 +1280,11 @@ public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { //set the finish time taskAttempt.setFinishTime(); - String taskType = - TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()).toString(); - LOG.info("In TaskAttemptImpl taskType: " + taskType); long slotMillis = computeSlotMillis(taskAttempt); - JobCounterUpdateEvent jce = - new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId() - .getJobId()); + TaskId taskId = taskAttempt.attemptId.getTaskId(); + JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId()); jce.addCounterUpdate( - taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP ? + taskId.getTaskType() == TaskType.MAP ? JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES, slotMillis); taskAttempt.eventHandler.handle(jce); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index 24a908112d2..9dc135dc1be 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -81,7 +81,7 @@ /** * Implementation of Task interface. */ -@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) +@SuppressWarnings({ "rawtypes", "unchecked" }) public abstract class TaskImpl implements Task, EventHandler { private static final Log LOG = LogFactory.getLog(TaskImpl.class); @@ -505,7 +505,9 @@ protected TaskAttempt getSuccessfulAttempt() { // This is always called in the Write Lock private void addAndScheduleAttempt() { TaskAttempt attempt = createAttempt(); - LOG.info("Created attempt " + attempt.getID()); + if (LOG.isDebugEnabled()) { + LOG.debug("Created attempt " + attempt.getID()); + } switch (attempts.size()) { case 0: attempts = Collections.singletonMap(attempt.getID(), attempt); @@ -537,7 +539,10 @@ private void addAndScheduleAttempt() { @Override public void handle(TaskEvent event) { - LOG.debug("Processing " + event.getTaskID() + " of type " + event.getType()); + if (LOG.isDebugEnabled()) { + LOG.debug("Processing " + event.getTaskID() + " of type " + + event.getType()); + } try { writeLock.lock(); TaskState oldState = getState(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index b138e9a6619..5ff838c5257 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -30,7 +30,6 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.records.JobId; -import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; @@ -46,9 +45,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -149,7 +148,7 @@ protected void register() { LOG.info("minContainerCapability: " + minContainerCapability.getMemory()); LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory()); } catch (Exception are) { - LOG.info("Exception while registering", are); + LOG.error("Exception while registering", are); throw new YarnException(are); } } @@ -183,7 +182,7 @@ protected void unregister() { request.setTrackingUrl(historyUrl); scheduler.finishApplicationMaster(request); } catch(Exception are) { - LOG.info("Exception while unregistering ", are); + LOG.error("Exception while unregistering ", are); } } @@ -205,7 +204,7 @@ public void stop() { try { allocatorThread.join(); } catch (InterruptedException ie) { - LOG.info("InterruptedException while stopping", ie); + LOG.warn("InterruptedException while stopping", ie); } unregister(); super.stop(); @@ -228,7 +227,7 @@ public void run() { // TODO: for other exceptions } } catch (InterruptedException e) { - LOG.info("Allocated thread interrupted. Returning."); + LOG.warn("Allocated thread interrupted. Returning."); return; } } @@ -255,7 +254,9 @@ protected AMRMProtocol createSchedulerProxy() { if (UserGroupInformation.isSecurityEnabled()) { String tokenURLEncodedStr = System.getenv().get( ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME); - LOG.debug("AppMasterToken is " + tokenURLEncodedStr); + if (LOG.isDebugEnabled()) { + LOG.debug("AppMasterToken is " + tokenURLEncodedStr); + } Token token = new Token(); try { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index d188b7c42e9..bcb82230d6a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -41,6 +41,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; @@ -254,28 +255,30 @@ public void handle(ContainerAllocatorEvent event) { @SuppressWarnings({ "unchecked" }) protected synchronized void handleEvent(ContainerAllocatorEvent event) { - LOG.info("Processing the event " + event.toString()); recalculateReduceSchedule = true; if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) { ContainerRequestEvent reqEvent = (ContainerRequestEvent) event; + JobId jobId = getJob().getID(); + int supportedMaxContainerCapability = + getMaxContainerCapability().getMemory(); if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) { if (mapResourceReqt == 0) { mapResourceReqt = reqEvent.getCapability().getMemory(); int minSlotMemSize = getMinContainerCapability().getMemory(); mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize) * minSlotMemSize; - eventHandler.handle(new JobHistoryEvent(getJob().getID(), + eventHandler.handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceReqt))); LOG.info("mapResourceReqt:"+mapResourceReqt); - if (mapResourceReqt > getMaxContainerCapability().getMemory()) { + if (mapResourceReqt > supportedMaxContainerCapability) { String diagMsg = "MAP capability required is more than the supported " + "max container capability in the cluster. Killing the Job. mapResourceReqt: " + - mapResourceReqt + " maxContainerCapability:" + getMaxContainerCapability().getMemory(); + mapResourceReqt + " maxContainerCapability:" + supportedMaxContainerCapability; LOG.info(diagMsg); eventHandler.handle(new JobDiagnosticsUpdateEvent( - getJob().getID(), diagMsg)); - eventHandler.handle(new JobEvent(getJob().getID(), JobEventType.JOB_KILL)); + jobId, diagMsg)); + eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); } } //set the rounded off memory @@ -288,20 +291,20 @@ protected synchronized void handleEvent(ContainerAllocatorEvent event) { //round off on slotsize reduceResourceReqt = (int) Math.ceil((float) reduceResourceReqt/minSlotMemSize) * minSlotMemSize; - eventHandler.handle(new JobHistoryEvent(getJob().getID(), + eventHandler.handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent( org.apache.hadoop.mapreduce.TaskType.REDUCE, reduceResourceReqt))); LOG.info("reduceResourceReqt:"+reduceResourceReqt); - if (reduceResourceReqt > getMaxContainerCapability().getMemory()) { + if (reduceResourceReqt > supportedMaxContainerCapability) { String diagMsg = "REDUCE capability required is more than the " + "supported max container capability in the cluster. Killing the " + "Job. reduceResourceReqt: " + reduceResourceReqt + - " maxContainerCapability:" + getMaxContainerCapability().getMemory(); + " maxContainerCapability:" + supportedMaxContainerCapability; LOG.info(diagMsg); eventHandler.handle(new JobDiagnosticsUpdateEvent( - getJob().getID(), diagMsg)); - eventHandler.handle(new JobEvent(getJob().getID(), JobEventType.JOB_KILL)); + jobId, diagMsg)); + eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); } } //set the rounded off memory @@ -317,6 +320,9 @@ protected synchronized void handleEvent(ContainerAllocatorEvent event) { } else if ( event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) { + + LOG.info("Processing the event " + event.toString()); + TaskAttemptId aId = event.getAttemptID(); boolean removed = scheduledRequests.remove(aId); @@ -579,7 +585,7 @@ private List getResources() throws Exception { computeIgnoreBlacklisting(); for (ContainerStatus cont : finishedContainers) { - LOG.info("Received completed container " + cont); + LOG.info("Received completed container " + cont.getContainerId()); TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId()); if (attemptID == null) { LOG.error("Container complete event for unknown container id " @@ -664,7 +670,9 @@ void addMap(ContainerRequestEvent event) { mapsHostMapping.put(host, list); } list.add(event.getAttemptID()); - LOG.info("Added attempt req to host " + host); + if (LOG.isDebugEnabled()) { + LOG.debug("Added attempt req to host " + host); + } } for (String rack: event.getRacks()) { LinkedList list = mapsRackMapping.get(rack); @@ -673,7 +681,9 @@ void addMap(ContainerRequestEvent event) { mapsRackMapping.put(rack, list); } list.add(event.getAttemptID()); - LOG.info("Added attempt req to rack " + rack); + if (LOG.isDebugEnabled()) { + LOG.debug("Added attempt req to rack " + rack); + } } request = new ContainerRequest(event, PRIORITY_MAP); } @@ -694,18 +704,21 @@ private void assign(List allocatedContainers) { containersAllocated += allocatedContainers.size(); while (it.hasNext()) { Container allocated = it.next(); - LOG.info("Assigning container " + allocated.getId() + - " with priority " + allocated.getPriority() + - " to NM " + allocated.getNodeId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Assigning container " + allocated.getId() + + " with priority " + allocated.getPriority() + " to NM " + + allocated.getNodeId()); + } // check if allocated container meets memory requirements // and whether we have any scheduled tasks that need // a container to be assigned boolean isAssignable = true; Priority priority = allocated.getPriority(); + int allocatedMemory = allocated.getResource().getMemory(); if (PRIORITY_FAST_FAIL_MAP.equals(priority) || PRIORITY_MAP.equals(priority)) { - if (allocated.getResource().getMemory() < mapResourceReqt + if (allocatedMemory < mapResourceReqt || maps.isEmpty()) { LOG.info("Cannot assign container " + allocated + " for a map as either " @@ -716,7 +729,7 @@ private void assign(List allocatedContainers) { } } else if (PRIORITY_REDUCE.equals(priority)) { - if (allocated.getResource().getMemory() < reduceResourceReqt + if (allocatedMemory < reduceResourceReqt || reduces.isEmpty()) { LOG.info("Cannot assign container " + allocated + " for a reduce as either " @@ -730,15 +743,17 @@ else if (PRIORITY_REDUCE.equals(priority)) { boolean blackListed = false; ContainerRequest assigned = null; + ContainerId allocatedContainerId = allocated.getId(); if (isAssignable) { // do not assign if allocated container is on a // blacklisted host - blackListed = isNodeBlacklisted(allocated.getNodeId().getHost()); + String allocatedHost = allocated.getNodeId().getHost(); + blackListed = isNodeBlacklisted(allocatedHost); if (blackListed) { // we need to request for a new container // and release the current one LOG.info("Got allocated container on a blacklisted " - + " host "+allocated.getNodeId().getHost() + + " host "+allocatedHost +". Releasing container " + allocated); // find the request matching this allocated container @@ -775,11 +790,13 @@ else if (PRIORITY_REDUCE.equals(priority)) { eventHandler.handle(new TaskAttemptContainerAssignedEvent( assigned.attemptID, allocated, applicationACLs)); - assignedRequests.add(allocated.getId(), assigned.attemptID); + assignedRequests.add(allocatedContainerId, assigned.attemptID); - LOG.info("Assigned container (" + allocated + ") " + - " to task " + assigned.attemptID + - " on node " + allocated.getNodeId().toString()); + if (LOG.isDebugEnabled()) { + LOG.info("Assigned container (" + allocated + ") " + + " to task " + assigned.attemptID + " on node " + + allocated.getNodeId().toString()); + } } else { //not assigned to any request, release the container @@ -794,7 +811,7 @@ else if (PRIORITY_REDUCE.equals(priority)) { // or if we could not assign it if (blackListed || assigned == null) { containersReleased++; - release(allocated.getId()); + release(allocatedContainerId); } } } @@ -807,10 +824,14 @@ private ContainerRequest assign(Container allocated) { LOG.info("Assigning container " + allocated + " to fast fail map"); assigned = assignToFailedMap(allocated); } else if (PRIORITY_REDUCE.equals(priority)) { - LOG.info("Assigning container " + allocated + " to reduce"); + if (LOG.isDebugEnabled()) { + LOG.debug("Assigning container " + allocated + " to reduce"); + } assigned = assignToReduce(allocated); } else if (PRIORITY_MAP.equals(priority)) { - LOG.info("Assigning container " + allocated + " to map"); + if (LOG.isDebugEnabled()) { + LOG.debug("Assigning container " + allocated + " to map"); + } assigned = assignToMap(allocated); } else { LOG.warn("Container allocated at unwanted priority: " + priority + @@ -897,7 +918,9 @@ private ContainerRequest assignToMap(Container allocated) { String host = allocated.getNodeId().getHost(); LinkedList list = mapsHostMapping.get(host); while (list != null && list.size() > 0) { - LOG.info("Host matched to the request list " + host); + if (LOG.isDebugEnabled()) { + LOG.debug("Host matched to the request list " + host); + } TaskAttemptId tId = list.removeFirst(); if (maps.containsKey(tId)) { assigned = maps.remove(tId); @@ -906,7 +929,9 @@ private ContainerRequest assignToMap(Container allocated) { jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1); eventHandler.handle(jce); hostLocalAssigned++; - LOG.info("Assigned based on host match " + host); + if (LOG.isDebugEnabled()) { + LOG.debug("Assigned based on host match " + host); + } break; } } @@ -922,7 +947,9 @@ private ContainerRequest assignToMap(Container allocated) { jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1); eventHandler.handle(jce); rackLocalAssigned++; - LOG.info("Assigned based on rack match " + rack); + if (LOG.isDebugEnabled()) { + LOG.debug("Assigned based on rack match " + rack); + } break; } } @@ -933,7 +960,9 @@ private ContainerRequest assignToMap(Container allocated) { new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId()); jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1); eventHandler.handle(jce); - LOG.info("Assigned based on * match"); + if (LOG.isDebugEnabled()) { + LOG.debug("Assigned based on * match"); + } break; } } @@ -953,8 +982,7 @@ private class AssignedRequests { new HashSet(); void add(ContainerId containerId, TaskAttemptId tId) { - LOG.info("Assigned container " + containerId.toString() - + " to " + tId); + LOG.info("Assigned container " + containerId.toString() + " to " + tId); containerToAttemptMap.put(containerId, tId); if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) { maps.put(tId, containerId); @@ -963,6 +991,7 @@ void add(ContainerId containerId, TaskAttemptId tId) { } } + @SuppressWarnings("unchecked") void preemptReduce(int toPreempt) { List reduceList = new ArrayList (reduces.keySet()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index 7aa638afe6b..ea3101a68d5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -155,13 +155,14 @@ protected AMResponse makeRemoteRequest() throws YarnRemoteException { lastClusterNmCount = clusterNmCount; clusterNmCount = allocateResponse.getNumClusterNodes(); - LOG.info("getResources() for " + applicationId + ":" + " ask=" - + ask.size() + " release= " + release.size() + - " newContainers=" + response.getAllocatedContainers().size() + - " finishedContainers=" + - response.getCompletedContainersStatuses().size() + - " resourcelimit=" + availableResources + - " knownNMs=" + clusterNmCount); + if (ask.size() > 0 || release.size() > 0) { + LOG.info("getResources() for " + applicationId + ":" + " ask=" + + ask.size() + " release= " + release.size() + " newContainers=" + + response.getAllocatedContainers().size() + " finishedContainers=" + + response.getCompletedContainersStatuses().size() + + " resourcelimit=" + availableResources + " knownNMs=" + + clusterNmCount); + } ask.clear(); release.clear(); @@ -172,6 +173,9 @@ protected AMResponse makeRemoteRequest() throws YarnRemoteException { // knownNodeCount is based on node managers, not hosts. blacklisting is // currently based on hosts. protected void computeIgnoreBlacklisting() { + if (!nodeBlacklistingEnabled) { + return; + } if (blacklistDisablePercent != -1 && (blacklistedNodeCount != blacklistedNodes.size() || clusterNmCount != lastClusterNmCount)) { @@ -200,7 +204,9 @@ protected void containerFailedOnHost(String hostName) { return; } if (blacklistedNodes.contains(hostName)) { - LOG.info("Host " + hostName + " is already blacklisted."); + if (LOG.isDebugEnabled()) { + LOG.debug("Host " + hostName + " is already blacklisted."); + } return; //already blacklisted } Integer failures = nodeFailures.remove(hostName); @@ -293,7 +299,9 @@ private void addResourceRequest(Priority priority, String resourceName, if (remoteRequests == null) { remoteRequests = new HashMap>(); this.remoteRequestsTable.put(priority, remoteRequests); - LOG.info("Added priority=" + priority); + if (LOG.isDebugEnabled()) { + LOG.debug("Added priority=" + priority); + } } Map reqMap = remoteRequests.get(resourceName); if (reqMap == null) { @@ -313,10 +321,12 @@ private void addResourceRequest(Priority priority, String resourceName, // Note this down for next interaction with ResourceManager ask.add(remoteRequest); - LOG.info("addResourceRequest:" + " applicationId=" + applicationId.getId() - + " priority=" + priority.getPriority() + " resourceName=" + resourceName - + " numContainers=" + remoteRequest.getNumContainers() + " #asks=" - + ask.size()); + if (LOG.isDebugEnabled()) { + LOG.debug("addResourceRequest:" + " applicationId=" + + applicationId.getId() + " priority=" + priority.getPriority() + + " resourceName=" + resourceName + " numContainers=" + + remoteRequest.getNumContainers() + " #asks=" + ask.size()); + } } private void decResourceRequest(Priority priority, String resourceName, @@ -328,16 +338,20 @@ private void decResourceRequest(Priority priority, String resourceName, // as we modify the resource requests by filtering out blacklisted hosts // when they are added, this value may be null when being // decremented - LOG.debug("Not decrementing resource as " + resourceName - + " is not present in request table"); + if (LOG.isDebugEnabled()) { + LOG.debug("Not decrementing resource as " + resourceName + + " is not present in request table"); + } return; } ResourceRequest remoteRequest = reqMap.get(capability); - LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.getId() - + " priority=" + priority.getPriority() + " resourceName=" + resourceName - + " numContainers=" + remoteRequest.getNumContainers() + " #asks=" - + ask.size()); + if (LOG.isDebugEnabled()) { + LOG.debug("BEFORE decResourceRequest:" + " applicationId=" + + applicationId.getId() + " priority=" + priority.getPriority() + + " resourceName=" + resourceName + " numContainers=" + + remoteRequest.getNumContainers() + " #asks=" + ask.size()); + } remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1); if (remoteRequest.getNumContainers() == 0) { @@ -355,10 +369,12 @@ private void decResourceRequest(Priority priority, String resourceName, //already have it. } - LOG.info("AFTER decResourceRequest:" + " applicationId=" - + applicationId.getId() + " priority=" + priority.getPriority() - + " resourceName=" + resourceName + " numContainers=" - + remoteRequest.getNumContainers() + " #asks=" + ask.size()); + if (LOG.isDebugEnabled()) { + LOG.info("AFTER decResourceRequest:" + " applicationId=" + + applicationId.getId() + " priority=" + priority.getPriority() + + " resourceName=" + resourceName + " numContainers=" + + remoteRequest.getNumContainers() + " #asks=" + ask.size()); + } } protected void release(ContainerId containerId) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 3eb214d79c9..00bdaebfe80 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -436,7 +436,7 @@ protected ClientService createClientService(AppContext context) { return new ClientService(){ @Override public InetSocketAddress getBindAddress() { - return null; + return NetUtils.createSocketAddr("localhost:9876"); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java index 279b81199ae..ebb20b06565 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.v2.app; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -29,16 +31,30 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.api.AMRMProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.service.AbstractService; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.junit.Test; public class MRAppBenchmark { @@ -167,17 +183,89 @@ public void stop() { } } + @Test public void benchmark1() throws Exception { - int maps = 100000; - int reduces = 100; + int maps = 100; // Adjust for benchmarking. Start with thousands. + int reduces = 0; System.out.println("Running benchmark with maps:"+maps + " reduces:"+reduces); - run(new MRApp(maps, reduces, true, this.getClass().getName(), true)); + run(new MRApp(maps, reduces, true, this.getClass().getName(), true) { + + @Override + protected ContainerAllocator createContainerAllocator( + ClientService clientService, AppContext context) { + return new RMContainerAllocator(clientService, context) { + @Override + protected AMRMProtocol createSchedulerProxy() { + return new AMRMProtocol() { + + @Override + public RegisterApplicationMasterResponse + registerApplicationMaster( + RegisterApplicationMasterRequest request) + throws YarnRemoteException { + RegisterApplicationMasterResponse response = + Records.newRecord(RegisterApplicationMasterResponse.class); + response.setMinimumResourceCapability(BuilderUtils + .newResource(1024)); + response.setMaximumResourceCapability(BuilderUtils + .newResource(10240)); + return response; + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) + throws YarnRemoteException { + FinishApplicationMasterResponse response = + Records.newRecord(FinishApplicationMasterResponse.class); + return response; + } + + @Override + public AllocateResponse allocate(AllocateRequest request) + throws YarnRemoteException { + + AllocateResponse response = + Records.newRecord(AllocateResponse.class); + List askList = request.getAskList(); + List containers = new ArrayList(); + for (ResourceRequest req : askList) { + if (req.getHostName() != "*") { + continue; + } + int numContainers = req.getNumContainers(); + for (int i = 0; i < numContainers; i++) { + ContainerId containerId = + BuilderUtils.newContainerId( + request.getApplicationAttemptId(), + request.getResponseId() + i); + containers.add(BuilderUtils + .newContainer(containerId, BuilderUtils.newNodeId("host" + + containerId.getId(), 2345), + "host" + containerId.getId() + ":5678", req + .getCapability(), req.getPriority(), null)); + } + } + + AMResponse amResponse = Records.newRecord(AMResponse.class); + amResponse.setAllocatedContainers(containers); + amResponse.setResponseId(request.getResponseId() + 1); + response.setAMResponse(amResponse); + response.setNumClusterNodes(350); + return response; + } + }; + } + }; + } + }); } + @Test public void benchmark2() throws Exception { - int maps = 4000; - int reduces = 1000; + int maps = 100; // Adjust for benchmarking, start with a couple of thousands + int reduces = 50; int maxConcurrentRunningTasks = 500; System.out.println("Running benchmark with throttled running tasks with " + diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Priority.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Priority.java index bed0788365e..fea1f48aea3 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Priority.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Priority.java @@ -23,7 +23,7 @@ * allocation * */ -public interface Priority extends Comparable { +public abstract class Priority implements Comparable { /** * Get the assigned priority @@ -37,4 +37,31 @@ public interface Priority extends Comparable { */ public abstract void setPriority(int priority); + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + getPriority(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Priority other = (Priority) obj; + if (getPriority() != other.getPriority()) + return false; + return true; + } + + @Override + public int compareTo(Priority other) { + return this.getPriority() - other.getPriority(); + } + } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index 4072da1b613..72d703eeaf7 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -48,7 +48,7 @@ */ @Public @Stable -public interface ResourceRequest extends Comparable { +public abstract class ResourceRequest implements Comparable { /** * Get the Priority of the request. * @return Priority of the request @@ -121,4 +121,79 @@ public interface ResourceRequest extends Comparable { @Public @Stable public abstract void setNumContainers(int numContainers); + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + Resource capability = getCapability(); + String hostName = getHostName(); + Priority priority = getPriority(); + result = + prime * result + ((capability == null) ? 0 : capability.hashCode()); + result = prime * result + ((hostName == null) ? 0 : hostName.hashCode()); + result = prime * result + getNumContainers(); + result = prime * result + ((priority == null) ? 0 : priority.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ResourceRequest other = (ResourceRequest) obj; + Resource capability = getCapability(); + if (capability == null) { + if (other.getCapability() != null) + return false; + } else if (!capability.equals(other.getCapability())) + return false; + String hostName = getHostName(); + if (hostName == null) { + if (other.getHostName() != null) + return false; + } else if (!hostName.equals(other.getHostName())) + return false; + if (getNumContainers() != other.getNumContainers()) + return false; + Priority priority = getPriority(); + if (priority == null) { + if (other.getPriority() != null) + return false; + } else if (!priority.equals(other.getPriority())) + return false; + return true; + } + + @Override + public int compareTo(ResourceRequest other) { + int priorityComparison = this.getPriority().compareTo(other.getPriority()); + if (priorityComparison == 0) { + int hostNameComparison = + this.getHostName().compareTo(other.getHostName()); + if (hostNameComparison == 0) { + int capabilityComparison = + this.getCapability().compareTo(other.getCapability()); + if (capabilityComparison == 0) { + int numContainersComparison = + this.getNumContainers() - other.getNumContainers(); + if (numContainersComparison == 0) { + return 0; + } else { + return numContainersComparison; + } + } else { + return capabilityComparison; + } + } else { + return hostNameComparison; + } + } else { + return priorityComparison; + } + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/PriorityPBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/PriorityPBImpl.java index 11bf6d0dd3d..9a3f9bb2c95 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/PriorityPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/PriorityPBImpl.java @@ -18,15 +18,11 @@ package org.apache.hadoop.yarn.api.records.impl.pb; - import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.ProtoBase; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProtoOrBuilder; - - -public class PriorityPBImpl extends ProtoBase implements Priority { +public class PriorityPBImpl extends Priority { PriorityProto proto = PriorityProto.getDefaultInstance(); PriorityProto.Builder builder = null; boolean viaProto = false; @@ -66,11 +62,4 @@ public void setPriority(int priority) { builder.setPriority((priority)); } - - @Override - public int compareTo(Priority other) { - return this.getPriority() - other.getPriority(); - } - - } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java index 33dba0d6c02..f3b8ffa89f9 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java @@ -20,19 +20,14 @@ import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.ProtoBase; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProtoOrBuilder; - - -public class ResourceRequestPBImpl extends ProtoBase implements ResourceRequest { +public class ResourceRequestPBImpl extends ResourceRequest { ResourceRequestProto proto = ResourceRequestProto.getDefaultInstance(); ResourceRequestProto.Builder builder = null; boolean viaProto = false; @@ -168,25 +163,4 @@ private ResourceProto convertToProtoFormat(Resource t) { return ((ResourcePBImpl)t).getProto(); } - @Override - public int compareTo(ResourceRequest other) { - if (this.getPriority().compareTo(other.getPriority()) == 0) { - if (this.getHostName().equals(other.getHostName())) { - if (this.getCapability().equals(other.getCapability())) { - if (this.getNumContainers() == other.getNumContainers()) { - return 0; - } else { - return this.getNumContainers() - other.getNumContainers(); - } - } else { - return this.getCapability().compareTo(other.getCapability()); - } - } else { - return this.getHostName().compareTo(other.getHostName()); - } - } else { - return this.getPriority().compareTo(other.getPriority()); - } - } - }