diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index b9d167b77a3..c7aace0597f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -77,6 +77,9 @@ Trunk (Unreleased) MAPREDUCE-5189. Add policies and wiring to respond to preemption requests from YARN. (Carlo Curino via cdouglas) + MAPREDUCE-5196. Add bookkeeping for managing checkpoints of task state. + (Carlo Curino via cdouglas) + BUG FIXES MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 8af7e379873..9f4b9c7ec63 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -36,7 +36,9 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.mapred.SortedRanges.Range; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; @@ -45,8 +47,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider; @@ -228,6 +230,22 @@ public class TaskAttemptListenerImpl extends CompositeService TaskAttemptEventType.TA_COMMIT_PENDING)); } + @Override + public void preempted(TaskAttemptID taskAttemptID, TaskStatus taskStatus) + throws IOException, InterruptedException { + LOG.info("Preempted state update from " + taskAttemptID.toString()); + // An attempt is telling us that it got preempted. + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = + TypeConverter.toYarn(taskAttemptID); + + preemptionPolicy.reportSuccessfulPreemption(attemptID); + taskHeartbeatHandler.progressing(attemptID); + + context.getEventHandler().handle( + new TaskAttemptEvent(attemptID, + TaskAttemptEventType.TA_PREEMPTED)); + } + @Override public void done(TaskAttemptID taskAttemptID) throws IOException { LOG.info("Done acknowledgement from " + taskAttemptID.toString()); @@ -250,6 +268,10 @@ public class TaskAttemptListenerImpl extends CompositeService org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); + + // handling checkpoints + preemptionPolicy.handleFailedContainer(attemptID); + context.getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); } @@ -264,6 +286,10 @@ public class TaskAttemptListenerImpl extends CompositeService org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter.toYarn(taskAttemptID); + + // handling checkpoints + preemptionPolicy.handleFailedContainer(attemptID); + context.getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); } @@ -293,12 +319,6 @@ public class TaskAttemptListenerImpl extends CompositeService return new MapTaskCompletionEventsUpdate(events, shouldReset); } - @Override - public boolean ping(TaskAttemptID taskAttemptID) throws IOException { - LOG.info("Ping from " + taskAttemptID.toString()); - return true; - } - @Override public void reportDiagnosticInfo(TaskAttemptID taskAttemptID, String diagnosticInfo) throws IOException { @@ -321,11 +341,33 @@ public class TaskAttemptListenerImpl extends CompositeService } @Override - public boolean statusUpdate(TaskAttemptID taskAttemptID, + public AMFeedback statusUpdate(TaskAttemptID taskAttemptID, TaskStatus taskStatus) throws IOException, InterruptedException { - LOG.info("Status update from " + taskAttemptID.toString()); + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID = TypeConverter.toYarn(taskAttemptID); + + AMFeedback feedback = new AMFeedback(); + feedback.setTaskFound(true); + + // Propagating preemption to the task if TASK_PREEMPTION is enabled + if (getConfig().getBoolean(MRJobConfig.TASK_PREEMPTION, false) + && preemptionPolicy.isPreempted(yarnAttemptID)) { + feedback.setPreemption(true); + LOG.info("Setting preemption bit for task: "+ yarnAttemptID + + " of type " + yarnAttemptID.getTaskId().getTaskType()); + } + + if (taskStatus == null) { + //We are using statusUpdate only as a simple ping + LOG.info("Ping from " + taskAttemptID.toString()); + taskHeartbeatHandler.progressing(yarnAttemptID); + return feedback; + } + + // if we are here there is an actual status update to be processed + LOG.info("Status update from " + taskAttemptID.toString()); + taskHeartbeatHandler.progressing(yarnAttemptID); TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus(); @@ -386,7 +428,7 @@ public class TaskAttemptListenerImpl extends CompositeService context.getEventHandler().handle( new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id, taskAttemptStatus)); - return true; + return feedback; } @Override @@ -494,4 +536,18 @@ public class TaskAttemptListenerImpl extends CompositeService return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash); } + + // task checkpoint bookeeping + @Override + public TaskCheckpointID getCheckpointID(TaskID taskId) { + TaskId tid = TypeConverter.toYarn(taskId); + return preemptionPolicy.getCheckpointID(tid); + } + + @Override + public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) { + TaskId tid = TypeConverter.toYarn(taskId); + preemptionPolicy.setCheckpointID(tid, cid); + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java index a43263264e9..1f05ac30aaf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java @@ -47,6 +47,7 @@ public enum TaskAttemptEventType { TA_FAILMSG, TA_UPDATE, TA_TIMED_OUT, + TA_PREEMPTED, //Producer:TaskCleaner TA_CLEANUP_DONE, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 5e14ce1cb52..37c5064b182 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -304,6 +304,9 @@ public abstract class TaskAttemptImpl implements .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION) + .addTransition(TaskAttemptStateInternal.RUNNING, + TaskAttemptStateInternal.KILLED, + TaskAttemptEventType.TA_PREEMPTED, new PreemptedTransition()) // Transitions from COMMIT_PENDING state .addTransition(TaskAttemptStateInternal.COMMIT_PENDING, @@ -437,6 +440,7 @@ public abstract class TaskAttemptImpl implements TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG, TaskAttemptEventType.TA_CONTAINER_CLEANED, + TaskAttemptEventType.TA_PREEMPTED, // Container launch events can arrive late TaskAttemptEventType.TA_CONTAINER_LAUNCHED, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)) @@ -1874,6 +1878,27 @@ public abstract class TaskAttemptImpl implements } } + private static class PreemptedTransition implements + SingleArcTransition { + @SuppressWarnings("unchecked") + @Override + public void transition(TaskAttemptImpl taskAttempt, + TaskAttemptEvent event) { + taskAttempt.setFinishTime(); + taskAttempt.taskAttemptListener.unregister( + taskAttempt.attemptId, taskAttempt.jvmID); + taskAttempt.eventHandler.handle(new ContainerLauncherEvent( + taskAttempt.attemptId, + taskAttempt.getAssignedContainerID(), taskAttempt.getAssignedContainerMgrAddress(), + taskAttempt.container.getContainerToken(), + ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP)); + taskAttempt.eventHandler.handle(new TaskTAttemptEvent( + taskAttempt.attemptId, + TaskEventType.T_ATTEMPT_KILLED)); + + } + } + private static class CleanupContainerTransition implements SingleArcTransition { @SuppressWarnings("unchecked") diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index dd739f2b7c3..18491fdbf1d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -347,7 +347,7 @@ public class RMContainerAllocator extends RMContainerRequestor } } else if ( - event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) { + event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) { LOG.info("Processing the event " + event.toString()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java index 0bbe75bdea3..85211f958d6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java @@ -19,10 +19,9 @@ package org.apache.hadoop.mapreduce.v2.app.rm.preemption; import java.util.List; -import org.apache.hadoop.mapred.TaskAttemptID; -import org.apache.hadoop.mapred.TaskID; import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.yarn.api.records.Container; @@ -81,7 +80,7 @@ public interface AMPreemptionPolicy { * successfully preempted (for bookeeping, counters, etc..) * @param attemptID Task attempt that preempted */ - public void reportSuccessfulPreemption(TaskAttemptID attemptID); + public void reportSuccessfulPreemption(TaskAttemptId attemptID); /** * Callback informing the policy of containers exiting with a failure. This @@ -98,20 +97,20 @@ public interface AMPreemptionPolicy { public void handleCompletedContainer(TaskAttemptId attemptID); /** - * Method to retrieve the latest checkpoint for a given {@link TaskID} + * Method to retrieve the latest checkpoint for a given {@link TaskId} * @param taskId TaskID * @return CheckpointID associated with this task or null */ - public TaskCheckpointID getCheckpointID(TaskID taskId); + public TaskCheckpointID getCheckpointID(TaskId taskId); /** * Method to store the latest {@link * org.apache.hadoop.mapreduce.checkpoint.CheckpointID} for a given {@link - * TaskID}. Assigning a null is akin to remove all previous checkpoints for + * TaskId}. Assigning a null is akin to remove all previous checkpoints for * this task. * @param taskId TaskID * @param cid Checkpoint to assign or null to remove it. */ - public void setCheckpointID(TaskID taskId, TaskCheckpointID cid); + public void setCheckpointID(TaskId taskId, TaskCheckpointID cid); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java new file mode 100644 index 00000000000..57e0bce1ff7 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.v2.app.rm.preemption; + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.JobCounter; +import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.PreemptionContainer; +import org.apache.hadoop.yarn.api.records.PreemptionContract; +import org.apache.hadoop.yarn.api.records.PreemptionMessage; +import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; +import org.apache.hadoop.yarn.event.EventHandler; + +/** + * This policy works in combination with an implementation of task + * checkpointing. It computes the tasks to be preempted in response to the RM + * request for preemption. For strict requests, it maps containers to + * corresponding tasks; for fungible requests, it attempts to pick the best + * containers to preempt (reducers in reverse allocation order). The + * TaskAttemptListener will interrogate this policy when handling a task + * heartbeat to check whether the task should be preempted or not. When handling + * fungible requests, the policy discount the RM ask by the amount of currently + * in-flight preemptions (i.e., tasks that are checkpointing). + * + * This class it is also used to maintain the list of checkpoints for existing + * tasks. Centralizing this functionality here, allows us to have visibility on + * preemption and checkpoints in a single location, thus coordinating preemption + * and checkpoint management decisions in a single policy. + */ +public class CheckpointAMPreemptionPolicy implements AMPreemptionPolicy { + + // task attempts flagged for preemption + private final Set toBePreempted; + + private final Set countedPreemptions; + + private final Map checkpoints; + + private final Map pendingFlexiblePreemptions; + + @SuppressWarnings("rawtypes") + private EventHandler eventHandler; + + static final Log LOG = LogFactory + .getLog(CheckpointAMPreemptionPolicy.class); + + public CheckpointAMPreemptionPolicy() { + this(Collections.synchronizedSet(new HashSet()), + Collections.synchronizedSet(new HashSet()), + Collections.synchronizedMap(new HashMap()), + Collections.synchronizedMap(new HashMap())); + } + + CheckpointAMPreemptionPolicy(Set toBePreempted, + Set countedPreemptions, + Map checkpoints, + Map pendingFlexiblePreemptions) { + this.toBePreempted = toBePreempted; + this.countedPreemptions = countedPreemptions; + this.checkpoints = checkpoints; + this.pendingFlexiblePreemptions = pendingFlexiblePreemptions; + } + + @Override + public void init(AppContext context) { + this.eventHandler = context.getEventHandler(); + } + + @Override + public void preempt(Context ctxt, PreemptionMessage preemptionRequests) { + + if (preemptionRequests != null) { + + // handling non-negotiable preemption + + StrictPreemptionContract cStrict = preemptionRequests.getStrictContract(); + if (cStrict != null + && cStrict.getContainers() != null + && cStrict.getContainers().size() > 0) { + LOG.info("strict preemption :" + + preemptionRequests.getStrictContract().getContainers().size() + + " containers to kill"); + + // handle strict preemptions. These containers are non-negotiable + for (PreemptionContainer c : + preemptionRequests.getStrictContract().getContainers()) { + ContainerId reqCont = c.getId(); + TaskAttemptId reqTask = ctxt.getTaskAttempt(reqCont); + if (reqTask != null) { + // ignore requests for preempting containers running maps + if (org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE + .equals(reqTask.getTaskId().getTaskType())) { + toBePreempted.add(reqTask); + LOG.info("preempting " + reqCont + " running task:" + reqTask); + } else { + LOG.info("NOT preempting " + reqCont + " running task:" + reqTask); + } + } + } + } + + // handling negotiable preemption + PreemptionContract cNegot = preemptionRequests.getContract(); + if (cNegot != null + && cNegot.getResourceRequest() != null + && cNegot.getResourceRequest().size() > 0 + && cNegot.getContainers() != null + && cNegot.getContainers().size() > 0) { + + LOG.info("negotiable preemption :" + + preemptionRequests.getContract().getResourceRequest().size() + + " resourceReq, " + + preemptionRequests.getContract().getContainers().size() + + " containers"); + // handle fungible preemption. Here we only look at the total amount of + // resources to be preempted and pick enough of our containers to + // satisfy that. We only support checkpointing for reducers for now. + List reqResources = + preemptionRequests.getContract().getResourceRequest(); + + // compute the total amount of pending preemptions (to be discounted + // from current request) + int pendingPreemptionRam = 0; + int pendingPreemptionCores = 0; + for (Resource r : pendingFlexiblePreemptions.values()) { + pendingPreemptionRam += r.getMemory(); + pendingPreemptionCores += r.getVirtualCores(); + } + + // discount preemption request based on currently pending preemption + for (PreemptionResourceRequest rr : reqResources) { + ResourceRequest reqRsrc = rr.getResourceRequest(); + if (!ResourceRequest.ANY.equals(reqRsrc.getResourceName())) { + // For now, only respond to aggregate requests and ignore locality + continue; + } + + LOG.info("ResourceRequest:" + reqRsrc); + int reqCont = reqRsrc.getNumContainers(); + int reqMem = reqRsrc.getCapability().getMemory(); + int totalMemoryToRelease = reqCont * reqMem; + int reqCores = reqRsrc.getCapability().getVirtualCores(); + int totalCoresToRelease = reqCont * reqCores; + + // remove + if (pendingPreemptionRam > 0) { + // if goes negative we simply exit + totalMemoryToRelease -= pendingPreemptionRam; + // decrement pending resources if zero or negatve we will + // ignore it while processing next PreemptionResourceRequest + pendingPreemptionRam -= totalMemoryToRelease; + } + if (pendingPreemptionCores > 0) { + totalCoresToRelease -= pendingPreemptionCores; + pendingPreemptionCores -= totalCoresToRelease; + } + + // reverse order of allocation (for now) + List listOfCont = ctxt.getContainers(TaskType.REDUCE); + Collections.sort(listOfCont, new Comparator() { + @Override + public int compare(final Container o1, final Container o2) { + return o2.getId().getId() - o1.getId().getId(); + } + }); + + // preempt reducers first + for (Container cont : listOfCont) { + if (totalMemoryToRelease <= 0 && totalCoresToRelease<=0) { + break; + } + TaskAttemptId reduceId = ctxt.getTaskAttempt(cont.getId()); + int cMem = cont.getResource().getMemory(); + int cCores = cont.getResource().getVirtualCores(); + + if (!toBePreempted.contains(reduceId)) { + totalMemoryToRelease -= cMem; + totalCoresToRelease -= cCores; + toBePreempted.add(reduceId); + pendingFlexiblePreemptions.put(reduceId, cont.getResource()); + } + LOG.info("ResourceRequest:" + reqRsrc + " satisfied preempting " + + reduceId); + } + // if map was preemptable we would do add them to toBePreempted here + } + } + } + } + + @Override + public void handleFailedContainer(TaskAttemptId attemptID) { + toBePreempted.remove(attemptID); + checkpoints.remove(attemptID.getTaskId()); + } + + @Override + public void handleCompletedContainer(TaskAttemptId attemptID){ + LOG.info(" task completed:" + attemptID); + toBePreempted.remove(attemptID); + pendingFlexiblePreemptions.remove(attemptID); + } + + @Override + public boolean isPreempted(TaskAttemptId yarnAttemptID) { + if (toBePreempted.contains(yarnAttemptID)) { + updatePreemptionCounters(yarnAttemptID); + return true; + } + return false; + } + + @Override + public void reportSuccessfulPreemption(TaskAttemptId taskAttemptID) { + // ignore + } + + @Override + public TaskCheckpointID getCheckpointID(TaskId taskId) { + return checkpoints.get(taskId); + } + + @Override + public void setCheckpointID(TaskId taskId, TaskCheckpointID cid) { + checkpoints.put(taskId, cid); + if (cid != null) { + updateCheckpointCounters(taskId, cid); + } + } + + @SuppressWarnings({ "unchecked" }) + private void updateCheckpointCounters(TaskId taskId, TaskCheckpointID cid) { + JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskId.getJobId()); + jce.addCounterUpdate(JobCounter.CHECKPOINTS, 1); + eventHandler.handle(jce); + jce = new JobCounterUpdateEvent(taskId.getJobId()); + jce.addCounterUpdate(JobCounter.CHECKPOINT_BYTES, cid.getCheckpointBytes()); + eventHandler.handle(jce); + jce = new JobCounterUpdateEvent(taskId.getJobId()); + jce.addCounterUpdate(JobCounter.CHECKPOINT_TIME, cid.getCheckpointTime()); + eventHandler.handle(jce); + + } + + @SuppressWarnings({ "unchecked" }) + private void updatePreemptionCounters(TaskAttemptId yarnAttemptID) { + if (!countedPreemptions.contains(yarnAttemptID)) { + countedPreemptions.add(yarnAttemptID); + JobCounterUpdateEvent jce = new JobCounterUpdateEvent(yarnAttemptID + .getTaskId().getJobId()); + jce.addCounterUpdate(JobCounter.TASKS_REQ_PREEMPT, 1); + eventHandler.handle(jce); + } + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java index 100ef4f7af4..daf737a154c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java @@ -19,11 +19,10 @@ package org.apache.hadoop.mapreduce.v2.app.rm.preemption; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.mapred.TaskAttemptID; -import org.apache.hadoop.mapred.TaskID; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; @@ -89,17 +88,17 @@ public class KillAMPreemptionPolicy implements AMPreemptionPolicy { } @Override - public void reportSuccessfulPreemption(TaskAttemptID taskAttemptID) { + public void reportSuccessfulPreemption(TaskAttemptId taskAttemptID) { // ignore } @Override - public TaskCheckpointID getCheckpointID(TaskID taskId) { + public TaskCheckpointID getCheckpointID(TaskId taskId) { return null; } @Override - public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) { + public void setCheckpointID(TaskId taskId, TaskCheckpointID cid) { // ignore } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java index 0c020aca22b..f6cc7b1d918 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java @@ -17,10 +17,9 @@ */ package org.apache.hadoop.mapreduce.v2.app.rm.preemption; -import org.apache.hadoop.mapred.TaskAttemptID; -import org.apache.hadoop.mapred.TaskID; import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.yarn.api.records.PreemptionMessage; @@ -50,17 +49,17 @@ public class NoopAMPreemptionPolicy implements AMPreemptionPolicy { } @Override - public void reportSuccessfulPreemption(TaskAttemptID taskAttemptID) { + public void reportSuccessfulPreemption(TaskAttemptId taskAttemptID) { // ignore } @Override - public TaskCheckpointID getCheckpointID(TaskID taskId) { + public TaskCheckpointID getCheckpointID(TaskId taskId) { return null; } @Override - public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) { + public void setCheckpointID(TaskId taskId, TaskCheckpointID cid) { // ignore } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index ba8e3d30261..6563cda9aec 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -17,26 +17,23 @@ */ package org.apache.hadoop.mapred; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapreduce.checkpoint.EnumCounter; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; - -import junit.framework.Assert; +import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.checkpoint.CheckpointID; +import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID; +import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; @@ -46,21 +43,31 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.SystemClock; + import org.junit.Test; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; public class TestTaskAttemptListenerImpl { - public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl { + public static class MockTaskAttemptListenerImpl + extends TaskAttemptListenerImpl { public MockTaskAttemptListenerImpl(AppContext context, JobTokenSecretManager jobTokenSecretManager, RMHeartbeatHandler rmHeartbeatHandler, - TaskHeartbeatHandler hbHandler) { - super(context, jobTokenSecretManager, rmHeartbeatHandler, null); + TaskHeartbeatHandler hbHandler, + AMPreemptionPolicy policy) { + + super(context, jobTokenSecretManager, rmHeartbeatHandler, policy); this.taskHeartbeatHandler = hbHandler; } @@ -87,9 +94,16 @@ public class TestTaskAttemptListenerImpl { RMHeartbeatHandler rmHeartbeatHandler = mock(RMHeartbeatHandler.class); TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); + Dispatcher dispatcher = mock(Dispatcher.class); + EventHandler ea = mock(EventHandler.class); + when(dispatcher.getEventHandler()).thenReturn(ea); + + when(appCtx.getEventHandler()).thenReturn(ea); + CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy(); + policy.init(appCtx); MockTaskAttemptListenerImpl listener = new MockTaskAttemptListenerImpl(appCtx, secret, - rmHeartbeatHandler, hbHandler); + rmHeartbeatHandler, hbHandler, policy); Configuration conf = new Configuration(); listener.init(conf); listener.start(); @@ -144,7 +158,7 @@ public class TestTaskAttemptListenerImpl { assertNotNull(jvmid); try { JVMId.forName("jvm_001_002_m_004_006"); - Assert.fail(); + fail(); } catch (IllegalArgumentException e) { assertEquals(e.getMessage(), "TaskId string : jvm_001_002_m_004_006 is not properly formed"); @@ -190,8 +204,14 @@ public class TestTaskAttemptListenerImpl { RMHeartbeatHandler rmHeartbeatHandler = mock(RMHeartbeatHandler.class); final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); - TaskAttemptListenerImpl listener = - new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) { + Dispatcher dispatcher = mock(Dispatcher.class); + EventHandler ea = mock(EventHandler.class); + when(dispatcher.getEventHandler()).thenReturn(ea); + when(appCtx.getEventHandler()).thenReturn(ea); + CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy(); + policy.init(appCtx); + TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl( + appCtx, secret, rmHeartbeatHandler, policy) { @Override protected void registerHeartbeatHandler(Configuration conf) { taskHeartbeatHandler = hbHandler; @@ -219,7 +239,8 @@ public class TestTaskAttemptListenerImpl { isMap ? org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP : org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0); - RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); TaskAttemptCompletionEvent tce = recordFactory .newRecordInstance(TaskAttemptCompletionEvent.class); tce.setEventId(eventId); @@ -244,8 +265,14 @@ public class TestTaskAttemptListenerImpl { RMHeartbeatHandler rmHeartbeatHandler = mock(RMHeartbeatHandler.class); final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); - TaskAttemptListenerImpl listener = - new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) { + Dispatcher dispatcher = mock(Dispatcher.class); + EventHandler ea = mock(EventHandler.class); + when(dispatcher.getEventHandler()).thenReturn(ea); + when(appCtx.getEventHandler()).thenReturn(ea); + CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy(); + policy.init(appCtx); + TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl( + appCtx, secret, rmHeartbeatHandler, policy) { @Override protected void registerHeartbeatHandler(Configuration conf) { taskHeartbeatHandler = hbHandler; @@ -270,4 +297,88 @@ public class TestTaskAttemptListenerImpl { listener.stop(); } + + @Test + public void testCheckpointIDTracking() + throws IOException, InterruptedException{ + + SystemClock clock = new SystemClock(); + + org.apache.hadoop.mapreduce.v2.app.job.Task mockTask = + mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class); + when(mockTask.canCommit(any(TaskAttemptId.class))).thenReturn(true); + Job mockJob = mock(Job.class); + when(mockJob.getTask(any(TaskId.class))).thenReturn(mockTask); + + Dispatcher dispatcher = mock(Dispatcher.class); + EventHandler ea = mock(EventHandler.class); + when(dispatcher.getEventHandler()).thenReturn(ea); + + RMHeartbeatHandler rmHeartbeatHandler = + mock(RMHeartbeatHandler.class); + + AppContext appCtx = mock(AppContext.class); + when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob); + when(appCtx.getClock()).thenReturn(clock); + when(appCtx.getEventHandler()).thenReturn(ea); + JobTokenSecretManager secret = mock(JobTokenSecretManager.class); + final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); + when(appCtx.getEventHandler()).thenReturn(ea); + CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy(); + policy.init(appCtx); + TaskAttemptListenerImpl listener = new TaskAttemptListenerImpl( + appCtx, secret, rmHeartbeatHandler, policy) { + @Override + protected void registerHeartbeatHandler(Configuration conf) { + taskHeartbeatHandler = hbHandler; + } + }; + + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.TASK_PREEMPTION, true); + //conf.setBoolean("preemption.reduce", true); + + listener.init(conf); + listener.start(); + + TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0); + + List partialOut = new ArrayList(); + partialOut.add(new Path("/prev1")); + partialOut.add(new Path("/prev2")); + + Counters counters = mock(Counters.class); + final long CBYTES = 64L * 1024 * 1024; + final long CTIME = 4344L; + final Path CLOC = new Path("/test/1"); + Counter cbytes = mock(Counter.class); + when(cbytes.getValue()).thenReturn(CBYTES); + Counter ctime = mock(Counter.class); + when(ctime.getValue()).thenReturn(CTIME); + when(counters.findCounter(eq(EnumCounter.CHECKPOINT_BYTES))) + .thenReturn(cbytes); + when(counters.findCounter(eq(EnumCounter.CHECKPOINT_MS))) + .thenReturn(ctime); + + // propagating a taskstatus that contains a checkpoint id + TaskCheckpointID incid = new TaskCheckpointID(new FSCheckpointID( + CLOC), partialOut, counters); + listener.setCheckpointID( + org.apache.hadoop.mapred.TaskID.downgrade(tid.getTaskID()), incid); + + // and try to get it back + CheckpointID outcid = listener.getCheckpointID(tid.getTaskID()); + TaskCheckpointID tcid = (TaskCheckpointID) outcid; + assertEquals(CBYTES, tcid.getCheckpointBytes()); + assertEquals(CTIME, tcid.getCheckpointTime()); + assertTrue(partialOut.containsAll(tcid.getPartialCommittedOutput())); + assertTrue(tcid.getPartialCommittedOutput().containsAll(partialOut)); + + //assert it worked + assert outcid == incid; + + listener.stop(); + + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java new file mode 100644 index 00000000000..b62c1c9cd80 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestCheckpointPreemptionPolicy.java @@ -0,0 +1,329 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.v2.app; + +import org.apache.hadoop.yarn.api.records.PreemptionContract; +import org.apache.hadoop.yarn.api.records.PreemptionMessage; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.util.resource.Resources; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.mapred.TaskAttemptListenerImpl; +import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.app.MRAppMaster.RunningAppContext; +import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; +import org.apache.hadoop.yarn.api.records.PreemptionContainer; +import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest; +import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.junit.Before; +import org.junit.Test; + +public class TestCheckpointPreemptionPolicy { + + TaskAttemptListenerImpl pel= null; + RMContainerAllocator r; + JobId jid; + RunningAppContext mActxt; + Set preemptedContainers = new HashSet(); + Map assignedContainers = + new HashMap(); + private final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + HashMap contToResourceMap = + new HashMap(); + + private int minAlloc = 1024; + + @Before + @SuppressWarnings("rawtypes") // mocked generics + public void setup() { + ApplicationId appId = ApplicationId.newInstance(200, 1); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + jid = MRBuilderUtils.newJobId(appId, 1); + + mActxt = mock(RunningAppContext.class); + EventHandler ea = mock(EventHandler.class); + when(mActxt.getEventHandler()).thenReturn(ea); + for (int i = 0; i < 40; ++i) { + ContainerId cId = ContainerId.newInstance(appAttemptId, i); + if (0 == i % 7) { + preemptedContainers.add(cId); + } + TaskId tId = 0 == i % 2 + ? MRBuilderUtils.newTaskId(jid, i / 2, TaskType.MAP) + : MRBuilderUtils.newTaskId(jid, i / 2 + 1, TaskType.REDUCE); + assignedContainers.put(cId, MRBuilderUtils.newTaskAttemptId(tId, 0)); + contToResourceMap.put(cId, Resource.newInstance(2 * minAlloc, 2)); + } + + for (Map.Entry ent : + assignedContainers.entrySet()) { + System.out.println("cont:" + ent.getKey().getId() + + " type:" + ent.getValue().getTaskId().getTaskType() + + " res:" + contToResourceMap.get(ent.getKey()).getMemory() + "MB" ); + } + } + + @Test + public void testStrictPreemptionContract() { + + final Map containers = assignedContainers; + AMPreemptionPolicy.Context mPctxt = new AMPreemptionPolicy.Context() { + @Override + public TaskAttemptId getTaskAttempt(ContainerId cId) { + return containers.get(cId); + } + @Override + public List getContainers(TaskType t) { + List p = new ArrayList(); + for (Map.Entry ent : + assignedContainers.entrySet()) { + if (ent.getValue().getTaskId().getTaskType().equals(t)) { + p.add(Container.newInstance(ent.getKey(), null, null, + contToResourceMap.get(ent.getKey()), + Priority.newInstance(0), null)); + } + } + return p; + } + }; + + PreemptionMessage pM = generatePreemptionMessage(preemptedContainers, + contToResourceMap, Resource.newInstance(1024, 1), true); + + CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy(); + policy.init(mActxt); + policy.preempt(mPctxt, pM); + + + for (ContainerId c : preemptedContainers) { + TaskAttemptId t = assignedContainers.get(c); + if (TaskType.MAP.equals(t.getTaskId().getTaskType())) { + assert policy.isPreempted(t) == false; + } else { + assert policy.isPreempted(t); + } + } + } + + + @Test + public void testPreemptionContract() { + final Map containers = assignedContainers; + AMPreemptionPolicy.Context mPctxt = new AMPreemptionPolicy.Context() { + @Override + public TaskAttemptId getTaskAttempt(ContainerId cId) { + return containers.get(cId); + } + + @Override + public List getContainers(TaskType t) { + List p = new ArrayList(); + for (Map.Entry ent : + assignedContainers.entrySet()){ + if(ent.getValue().getTaskId().getTaskType().equals(t)){ + p.add(Container.newInstance(ent.getKey(), null, null, + contToResourceMap.get(ent.getKey()), + Priority.newInstance(0), null)); + } + } + return p; + } + }; + + PreemptionMessage pM = generatePreemptionMessage(preemptedContainers, + contToResourceMap, Resource.newInstance(minAlloc, 1), false); + + CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy(); + policy.init(mActxt); + + int supposedMemPreemption = pM.getContract().getResourceRequest() + .get(0).getResourceRequest().getCapability().getMemory() + * pM.getContract().getResourceRequest().get(0).getResourceRequest() + .getNumContainers(); + + // first round of preemption + policy.preempt(mPctxt, pM); + List preempting = + validatePreemption(pM, policy, supposedMemPreemption); + + // redundant message + policy.preempt(mPctxt, pM); + List preempting2 = + validatePreemption(pM, policy, supposedMemPreemption); + + // check that nothing got added + assert preempting2.equals(preempting); + + // simulate 2 task completions/successful preemption + policy.handleCompletedContainer(preempting.get(0)); + policy.handleCompletedContainer(preempting.get(1)); + + // remove from assignedContainers + Iterator> it = + assignedContainers.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry ent = it.next(); + if (ent.getValue().equals(preempting.get(0)) || + ent.getValue().equals(preempting.get(1))) + it.remove(); + } + + // one more message asking for preemption + policy.preempt(mPctxt, pM); + + // triggers preemption of 2 more containers (i.e., the preemption set changes) + List preempting3 = + validatePreemption(pM, policy, supposedMemPreemption); + assert preempting3.equals(preempting2) == false; + } + + private List validatePreemption(PreemptionMessage pM, + CheckpointAMPreemptionPolicy policy, int supposedMemPreemption) { + Resource effectivelyPreempted = Resource.newInstance(0, 0); + + List preempting = new ArrayList(); + + for (Map.Entry ent : + assignedContainers.entrySet()) { + if (policy.isPreempted(ent.getValue())) { + Resources.addTo(effectivelyPreempted,contToResourceMap.get(ent.getKey())); + // preempt only reducers + if (policy.isPreempted(ent.getValue())){ + assertEquals(TaskType.REDUCE, ent.getValue().getTaskId().getTaskType()); + preempting.add(ent.getValue()); + } + } + } + + // preempt enough + assert (effectivelyPreempted.getMemory() >= supposedMemPreemption) + : " preempted: " + effectivelyPreempted.getMemory(); + + // preempt not too much enough + assert effectivelyPreempted.getMemory() <= supposedMemPreemption + minAlloc; + return preempting; + } + + private PreemptionMessage generatePreemptionMessage( + Set containerToPreempt, + HashMap resPerCont, + Resource minimumAllocation, boolean strict) { + + Set currentContPreemption = Collections.unmodifiableSet( + new HashSet(containerToPreempt)); + containerToPreempt.clear(); + Resource tot = Resource.newInstance(0, 0); + for(ContainerId c : currentContPreemption){ + Resources.addTo(tot, + resPerCont.get(c)); + } + int numCont = (int) Math.ceil(tot.getMemory() / + (double) minimumAllocation.getMemory()); + ResourceRequest rr = ResourceRequest.newInstance( + Priority.newInstance(0), ResourceRequest.ANY, + minimumAllocation, numCont); + if (strict) { + return generatePreemptionMessage(new Allocation(null, null, + currentContPreemption, null, null)); + } + return generatePreemptionMessage(new Allocation(null, null, + null, currentContPreemption, + Collections.singletonList(rr))); + } + + + private PreemptionMessage generatePreemptionMessage(Allocation allocation) { + PreemptionMessage pMsg = null; + // assemble strict preemption request + if (allocation.getStrictContainerPreemptions() != null) { + pMsg = recordFactory.newRecordInstance(PreemptionMessage.class); + StrictPreemptionContract pStrict = + recordFactory.newRecordInstance(StrictPreemptionContract.class); + Set pCont = new HashSet(); + for (ContainerId cId : allocation.getStrictContainerPreemptions()) { + PreemptionContainer pc = + recordFactory.newRecordInstance(PreemptionContainer.class); + pc.setId(cId); + pCont.add(pc); + } + pStrict.setContainers(pCont); + pMsg.setStrictContract(pStrict); + } + + // assemble negotiable preemption request + if (allocation.getResourcePreemptions() != null && + allocation.getResourcePreemptions().size() > 0 && + allocation.getContainerPreemptions() != null && + allocation.getContainerPreemptions().size() > 0) { + if (pMsg == null) { + pMsg = recordFactory.newRecordInstance(PreemptionMessage.class); + } + PreemptionContract contract = + recordFactory.newRecordInstance(PreemptionContract.class); + Set pCont = new HashSet(); + for (ContainerId cId : allocation.getContainerPreemptions()) { + PreemptionContainer pc = + recordFactory.newRecordInstance(PreemptionContainer.class); + pc.setId(cId); + pCont.add(pc); + } + List pRes = + new ArrayList(); + for (ResourceRequest crr : allocation.getResourcePreemptions()) { + PreemptionResourceRequest prr = + recordFactory.newRecordInstance(PreemptionResourceRequest.class); + prr.setResourceRequest(crr); + pRes.add(prr); + } + contract.setContainers(pCont); + contract.setResourceRequest(pRes); + pMsg.setContract(contract); + } + return pMsg; + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java index 2bb7dc83655..b6855024f36 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.QueueInfo; import org.apache.hadoop.mapreduce.TaskCompletionEvent; import org.apache.hadoop.mapreduce.TaskTrackerInfo; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; import org.apache.hadoop.mapreduce.protocol.ClientProtocol; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; @@ -575,10 +576,17 @@ public class LocalJobRunner implements ClientProtocol { // TaskUmbilicalProtocol methods + @Override public JvmTask getTask(JvmContext context) { return null; } - public synchronized boolean statusUpdate(TaskAttemptID taskId, + @Override + public synchronized AMFeedback statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) throws IOException, InterruptedException { + AMFeedback feedback = new AMFeedback(); + feedback.setTaskFound(true); + if (null == taskStatus) { + return feedback; + } // Serialize as we would if distributed in order to make deep copy ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); @@ -618,7 +626,7 @@ public class LocalJobRunner implements ClientProtocol { } // ignore phase - return true; + return feedback; } /** Return the current values of the counters for this job, @@ -654,24 +662,24 @@ public class LocalJobRunner implements ClientProtocol { statusUpdate(taskid, taskStatus); } + @Override public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) { // Ignore for now } + @Override public void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range) throws IOException { LOG.info("Task " + taskid + " reportedNextRecordRange " + range); } - public boolean ping(TaskAttemptID taskid) throws IOException { - return true; - } - + @Override public boolean canCommit(TaskAttemptID taskid) throws IOException { return true; } + @Override public void done(TaskAttemptID taskId) throws IOException { int taskIndex = mapIds.indexOf(taskId); if (taskIndex >= 0) { // mapping @@ -681,11 +689,13 @@ public class LocalJobRunner implements ClientProtocol { } } + @Override public synchronized void fsError(TaskAttemptID taskId, String message) throws IOException { LOG.fatal("FSError: "+ message + "from task: " + taskId); } + @Override public void shuffleError(TaskAttemptID taskId, String message) throws IOException { LOG.fatal("shuffleError: "+ message + "from task: " + taskId); } @@ -695,12 +705,30 @@ public class LocalJobRunner implements ClientProtocol { LOG.fatal("Fatal: "+ msg + "from task: " + taskId); } + @Override public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, int fromEventId, int maxLocs, TaskAttemptID id) throws IOException { return new MapTaskCompletionEventsUpdate( org.apache.hadoop.mapred.TaskCompletionEvent.EMPTY_ARRAY, false); } - + + @Override + public void preempted(TaskAttemptID taskId, TaskStatus taskStatus) + throws IOException, InterruptedException { + // ignore + } + + @Override + public TaskCheckpointID getCheckpointID(TaskID taskId) { + // ignore + return null; + } + + @Override + public void setCheckpointID(TaskID downgrade, TaskCheckpointID cid) { + // ignore + } + } public LocalJobRunner(Configuration conf) throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java new file mode 100644 index 00000000000..210ac959c43 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AMFeedback.java @@ -0,0 +1,63 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +/** + * This class is a simple struct to include both the taskFound information and + * a possible preemption request coming from the AM. + */ +public class AMFeedback implements Writable { + + boolean taskFound; + boolean preemption; + + public void setTaskFound(boolean t){ + taskFound=t; + } + + public boolean getTaskFound(){ + return taskFound; + } + + public void setPreemption(boolean preemption) { + this.preemption=preemption; + } + + public boolean getPreemption() { + return preemption; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeBoolean(taskFound); + out.writeBoolean(preemption); + } + + @Override + public void readFields(DataInput in) throws IOException { + taskFound = in.readBoolean(); + preemption = in.readBoolean(); + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java index 685e61cfb63..660ffc65ad3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java @@ -187,6 +187,7 @@ abstract public class Task implements Writable, Configurable { protected SecretKey tokenSecret; protected SecretKey shuffleSecret; protected GcTimeUpdater gcUpdater; + final AtomicBoolean mustPreempt = new AtomicBoolean(false); //////////////////////////////////////////// // Constructors @@ -711,6 +712,7 @@ abstract public class Task implements Writable, Configurable { } try { boolean taskFound = true; // whether TT knows about this task + AMFeedback amFeedback = null; // sleep for a bit synchronized(lock) { if (taskDone.get()) { @@ -728,12 +730,14 @@ abstract public class Task implements Writable, Configurable { taskStatus.statusUpdate(taskProgress.get(), taskProgress.toString(), counters); - taskFound = umbilical.statusUpdate(taskId, taskStatus); + amFeedback = umbilical.statusUpdate(taskId, taskStatus); + taskFound = amFeedback.getTaskFound(); taskStatus.clearStatus(); } else { // send ping - taskFound = umbilical.ping(taskId); + amFeedback = umbilical.statusUpdate(taskId, null); + taskFound = amFeedback.getTaskFound(); } // if Task Tracker is not aware of our task ID (probably because it died and @@ -744,6 +748,17 @@ abstract public class Task implements Writable, Configurable { System.exit(66); } + // Set a flag that says we should preempt this is read by + // ReduceTasks in places of the execution where it is + // safe/easy to preempt + boolean lastPreempt = mustPreempt.get(); + mustPreempt.set(mustPreempt.get() || amFeedback.getPreemption()); + + if (lastPreempt ^ mustPreempt.get()) { + LOG.info("PREEMPTION TASK: setting mustPreempt to " + + mustPreempt.get() + " given " + amFeedback.getPreemption() + + " for "+ taskId + " task status: " +taskStatus.getPhase()); + } sendProgress = resetProgressFlag(); remainingRetries = MAX_RETRIES; } @@ -992,10 +1007,17 @@ abstract public class Task implements Writable, Configurable { public void done(TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, InterruptedException { - LOG.info("Task:" + taskId + " is done." - + " And is in the process of committing"); updateCounters(); - + if (taskStatus.getRunState() == TaskStatus.State.PREEMPTED ) { + // If we are preempted, do no output promotion; signal done and exit + committer.commitTask(taskContext); + umbilical.preempted(taskId, taskStatus); + taskDone.set(true); + reporter.stopCommunicationThread(); + return; + } + LOG.info("Task:" + taskId + " is done." + + " And is in the process of committing"); boolean commitRequired = isCommitRequired(); if (commitRequired) { int retries = MAX_RETRIES; @@ -1054,7 +1076,7 @@ abstract public class Task implements Writable, Configurable { int retries = MAX_RETRIES; while (true) { try { - if (!umbilical.statusUpdate(getTaskID(), taskStatus)) { + if (!umbilical.statusUpdate(getTaskID(), taskStatus).getTaskFound()) { LOG.warn("Parent died. Exiting "+taskId); System.exit(66); } @@ -1098,8 +1120,8 @@ abstract public class Task implements Writable, Configurable { if (isMapTask() && conf.getNumReduceTasks() > 0) { try { Path mapOutput = mapOutputFile.getOutputFile(); - FileSystem localFS = FileSystem.getLocal(conf); - return localFS.getFileStatus(mapOutput).getLen(); + FileSystem fs = mapOutput.getFileSystem(conf); + return fs.getFileStatus(mapOutput).getLen(); } catch (IOException e) { LOG.warn ("Could not find output size " , e); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java index 7bd5eb90616..a5c12de2627 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java @@ -51,7 +51,7 @@ public abstract class TaskStatus implements Writable, Cloneable { @InterfaceAudience.Private @InterfaceStability.Unstable public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, - COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN} + COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN, PREEMPTED} private final TaskAttemptID taskid; private float progress; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java index 425c3b87dad..5df02c7b5b1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java @@ -24,6 +24,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.mapred.JvmTask; +import org.apache.hadoop.mapreduce.checkpoint.CheckpointID; +import org.apache.hadoop.mapreduce.checkpoint.FSCheckpointID; +import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; import org.apache.hadoop.mapreduce.security.token.JobTokenSelector; import org.apache.hadoop.security.token.TokenInfo; @@ -64,9 +67,10 @@ public interface TaskUmbilicalProtocol extends VersionedProtocol { * Version 17 Modified TaskID to be aware of the new TaskTypes * Version 18 Added numRequiredSlots to TaskStatus for MAPREDUCE-516 * Version 19 Added fatalError for child to communicate fatal errors to TT + * Version 20 Added methods to manage checkpoints * */ - public static final long versionID = 19L; + public static final long versionID = 20L; /** * Called when a child task process starts, to get its task. @@ -78,7 +82,8 @@ public interface TaskUmbilicalProtocol extends VersionedProtocol { JvmTask getTask(JvmContext context) throws IOException; /** - * Report child's progress to parent. + * Report child's progress to parent. Also invoked to report still alive (used + * to be in ping). It reports an AMFeedback used to propagate preemption requests. * * @param taskId task-id of the child * @param taskStatus status of the child @@ -86,7 +91,7 @@ public interface TaskUmbilicalProtocol extends VersionedProtocol { * @throws InterruptedException * @return True if the task is known */ - boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) + AMFeedback statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) throws IOException, InterruptedException; /** Report error messages back to parent. Calls should be sparing, since all @@ -105,11 +110,6 @@ public interface TaskUmbilicalProtocol extends VersionedProtocol { void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range) throws IOException; - /** Periodically called by child to check if parent is still alive. - * @return True if the task is known - */ - boolean ping(TaskAttemptID taskid) throws IOException; - /** Report that the task is successfully completed. Failure is assumed if * the task process exits without calling this. * @param taskid task's id @@ -161,4 +161,33 @@ public interface TaskUmbilicalProtocol extends VersionedProtocol { TaskAttemptID id) throws IOException; + /** + * Report to the AM that the task has been succesfully preempted. + * + * @param taskId task's id + * @param taskStatus status of the child + * @throws IOException + */ + void preempted(TaskAttemptID taskId, TaskStatus taskStatus) + throws IOException, InterruptedException; + + /** + * Return the latest CheckpointID for the given TaskID. This provides + * the task with a way to locate the checkpointed data and restart from + * that point in the computation. + * + * @param taskID task's id + * @return the most recent checkpoint (if any) for this task + * @throws IOException + */ + TaskCheckpointID getCheckpointID(TaskID taskID); + + /** + * Send a CheckpointID for a given TaskID to be stored in the AM, + * to later restart a task from this checkpoint. + * @param tid + * @param cid + */ + void setCheckpointID(TaskID tid, TaskCheckpointID cid); + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java index 102b84f2483..17e6922fc3b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java @@ -34,37 +34,31 @@ import org.apache.hadoop.mapred.Counters; * cost of checkpoints and other counters. This is sent by the task to the AM * to be stored and provided to the next execution of the same task. */ -public class TaskCheckpointID implements CheckpointID{ +public class TaskCheckpointID implements CheckpointID { - FSCheckpointID rawId; - private List partialOutput; - private Counters counters; + final FSCheckpointID rawId; + private final List partialOutput; + private final Counters counters; public TaskCheckpointID() { - this.rawId = new FSCheckpointID(); - this.partialOutput = new ArrayList(); + this(new FSCheckpointID(), new ArrayList(), new Counters()); } public TaskCheckpointID(FSCheckpointID rawId, List partialOutput, Counters counters) { this.rawId = rawId; this.counters = counters; - if(partialOutput == null) - this.partialOutput = new ArrayList(); - else - this.partialOutput = partialOutput; + this.partialOutput = null == partialOutput + ? new ArrayList() + : partialOutput; } @Override public void write(DataOutput out) throws IOException { counters.write(out); - if (partialOutput == null) { - WritableUtils.writeVLong(out, 0L); - } else { - WritableUtils.writeVLong(out, partialOutput.size()); - for(Path p:partialOutput){ - Text.writeString(out, p.toString()); - } + WritableUtils.writeVLong(out, partialOutput.size()); + for (Path p : partialOutput) { + Text.writeString(out, p.toString()); } rawId.write(out); } @@ -74,21 +68,22 @@ public class TaskCheckpointID implements CheckpointID{ partialOutput.clear(); counters.readFields(in); long numPout = WritableUtils.readVLong(in); - for(int i=0;i