From 9ca394d54dd24e67867c845a58150f6b51761512 Mon Sep 17 00:00:00 2001 From: Christopher Douglas Date: Tue, 17 Dec 2013 22:54:31 +0000 Subject: [PATCH] MAPREDUCE-5189. Add policies and wiring to respond to preemption requests from YARN. Contributed by Carlo Curino. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1551748 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapred/TaskAttemptListenerImpl.java | 6 +- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 26 +++- .../v2/app/rm/RMContainerAllocator.java | 51 ++++++- .../app/rm/preemption/AMPreemptionPolicy.java | 117 ++++++++++++++++ .../rm/preemption/KillAMPreemptionPolicy.java | 111 +++++++++++++++ .../rm/preemption/NoopAMPreemptionPolicy.java | 72 ++++++++++ .../mapred/TestTaskAttemptListenerImpl.java | 6 +- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 4 +- .../mapreduce/v2/app/MRAppBenchmark.java | 9 +- .../hadoop/mapreduce/v2/app/TestFail.java | 6 +- .../v2/app/TestRMContainerAllocator.java | 16 ++- .../apache/hadoop/mapreduce/JobCounter.java | 6 +- .../apache/hadoop/mapreduce/MRJobConfig.java | 11 +- .../mapreduce/checkpoint/EnumCounter.java | 26 ++++ .../checkpoint/TaskCheckpointID.java | 126 ++++++++++++++++++ .../hadoop/mapreduce/JobCounter.properties | 4 + 17 files changed, 575 insertions(+), 25 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/EnumCounter.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 2a80119b0f9..63daf66a3ec 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -74,6 +74,9 @@ Trunk (Unreleased) MAPREDUCE-5197. Add a service for checkpointing task state. (Carlo Curino via cdouglas) + MAPREDUCE-5189. Add policies and wiring to respond to preemption requests + from YARN. (Carlo Curino via cdouglas) + BUG FIXES MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index aae95d47fe1..8af7e379873 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.authorize.PolicyProvider; @@ -84,14 +85,17 @@ public class TaskAttemptListenerImpl extends CompositeService .newSetFromMap(new ConcurrentHashMap()); private JobTokenSecretManager jobTokenSecretManager = null; + private AMPreemptionPolicy preemptionPolicy; public TaskAttemptListenerImpl(AppContext context, JobTokenSecretManager jobTokenSecretManager, - RMHeartbeatHandler rmHeartbeatHandler) { + RMHeartbeatHandler rmHeartbeatHandler, + AMPreemptionPolicy preemptionPolicy) { super(TaskAttemptListenerImpl.class.getName()); this.context = context; this.jobTokenSecretManager = jobTokenSecretManager; this.rmHeartbeatHandler = rmHeartbeatHandler; + this.preemptionPolicy = preemptionPolicy; } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index b60b64764a2..ca6aadfb1cc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -102,6 +102,8 @@ import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent; @@ -188,8 +190,8 @@ public class MRAppMaster extends CompositeService { private ContainerLauncher containerLauncher; private EventHandler committerEventHandler; private Speculator speculator; - private TaskAttemptListener taskAttemptListener; - private JobTokenSecretManager jobTokenSecretManager = + protected TaskAttemptListener taskAttemptListener; + protected JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(); private JobId jobId; private boolean newApiCommitter; @@ -197,6 +199,7 @@ public class MRAppMaster extends CompositeService { private JobEventDispatcher jobEventDispatcher; private JobHistoryEventHandler jobHistoryEventHandler; private SpeculatorEventDispatcher speculatorEventDispatcher; + private AMPreemptionPolicy preemptionPolicy; private Job job; private Credentials jobCredentials = new Credentials(); // Filled during init @@ -383,8 +386,12 @@ public class MRAppMaster extends CompositeService { committerEventHandler = createCommitterEventHandler(context, committer); addIfService(committerEventHandler); + //policy handling preemption requests from RM + preemptionPolicy = createPreemptionPolicy(conf); + preemptionPolicy.init(context); + //service to handle requests to TaskUmbilicalProtocol - taskAttemptListener = createTaskAttemptListener(context); + taskAttemptListener = createTaskAttemptListener(context, preemptionPolicy); addIfService(taskAttemptListener); //service to log job history events @@ -475,6 +482,12 @@ public class MRAppMaster extends CompositeService { return committer; } + protected AMPreemptionPolicy createPreemptionPolicy(Configuration conf) { + return ReflectionUtils.newInstance(conf.getClass( + MRJobConfig.MR_AM_PREEMPTION_POLICY, + NoopAMPreemptionPolicy.class, AMPreemptionPolicy.class), conf); + } + protected boolean keepJobFiles(JobConf conf) { return (conf.getKeepTaskFilesPattern() != null || conf .getKeepFailedTaskFiles()); @@ -692,10 +705,11 @@ public class MRAppMaster extends CompositeService { } } - protected TaskAttemptListener createTaskAttemptListener(AppContext context) { + protected TaskAttemptListener createTaskAttemptListener(AppContext context, + AMPreemptionPolicy preemptionPolicy) { TaskAttemptListener lis = new TaskAttemptListenerImpl(context, jobTokenSecretManager, - getRMHeartbeatHandler()); + getRMHeartbeatHandler(), preemptionPolicy); return lis; } @@ -805,7 +819,7 @@ public class MRAppMaster extends CompositeService { , containerID); } else { this.containerAllocator = new RMContainerAllocator( - this.clientService, this.context); + this.clientService, this.context, preemptionPolicy); } ((Service)this.containerAllocator).init(getConfig()); ((Service)this.containerAllocator).start(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index d6e45931632..dd739f2b7c3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdate import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; @@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -147,13 +149,17 @@ public class RMContainerAllocator extends RMContainerRequestor private long retryInterval; private long retrystartTime; + private final AMPreemptionPolicy preemptionPolicy; + BlockingQueue eventQueue = new LinkedBlockingQueue(); private ScheduleStats scheduleStats = new ScheduleStats(); - public RMContainerAllocator(ClientService clientService, AppContext context) { + public RMContainerAllocator(ClientService clientService, AppContext context, + AMPreemptionPolicy preemptionPolicy) { super(clientService, context); + this.preemptionPolicy = preemptionPolicy; this.stopped = new AtomicBoolean(false); } @@ -361,11 +367,15 @@ public class RMContainerAllocator extends RMContainerRequestor LOG.error("Could not deallocate container for task attemptId " + aId); } + preemptionPolicy.handleCompletedContainer(event.getAttemptID()); } else if ( event.getType() == ContainerAllocator.EventType.CONTAINER_FAILED) { ContainerFailedEvent fEv = (ContainerFailedEvent) event; String host = getHost(fEv.getContMgrAddress()); containerFailedOnHost(host); + // propagate failures to preemption policy to discard checkpoints for + // failed tasks + preemptionPolicy.handleFailedContainer(event.getAttemptID()); } } @@ -399,7 +409,7 @@ public class RMContainerAllocator extends RMContainerRequestor } scheduledRequests.reduces.clear(); - //preempt for making space for atleast one map + //preempt for making space for at least one map int premeptionLimit = Math.max(mapResourceReqt, (int) (maxReducePreemptionLimit * memLimit)); @@ -409,7 +419,7 @@ public class RMContainerAllocator extends RMContainerRequestor int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt); toPreempt = Math.min(toPreempt, assignedRequests.reduces.size()); - LOG.info("Going to preempt " + toPreempt); + LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps"); assignedRequests.preemptReduce(toPreempt); } } @@ -595,6 +605,14 @@ public class RMContainerAllocator extends RMContainerRequestor } List finishedContainers = response.getCompletedContainersStatuses(); + + // propagate preemption requests + final PreemptionMessage preemptReq = response.getPreemptionMessage(); + if (preemptReq != null) { + preemptionPolicy.preempt( + new PreemptionContext(assignedRequests), preemptReq); + } + if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom) { //something changed recalculateReduceSchedule = true; @@ -630,7 +648,9 @@ public class RMContainerAllocator extends RMContainerRequestor String diagnostics = StringInterner.weakIntern(cont.getDiagnostics()); eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID, diagnostics)); - } + + preemptionPolicy.handleCompletedContainer(attemptID); + } } return newContainers; } @@ -1232,4 +1252,27 @@ public class RMContainerAllocator extends RMContainerRequestor " RackLocal:" + rackLocalAssigned); } } + + static class PreemptionContext extends AMPreemptionPolicy.Context { + final AssignedRequests reqs; + + PreemptionContext(AssignedRequests reqs) { + this.reqs = reqs; + } + @Override + public TaskAttemptId getTaskAttempt(ContainerId container) { + return reqs.get(container); + } + + @Override + public List getContainers(TaskType t){ + if(TaskType.REDUCE.equals(t)) + return new ArrayList(reqs.reduces.values()); + if(TaskType.MAP.equals(t)) + return new ArrayList(reqs.maps.values()); + return null; + } + + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java new file mode 100644 index 00000000000..0bbe75bdea3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/AMPreemptionPolicy.java @@ -0,0 +1,117 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.mapreduce.v2.app.rm.preemption; + +import java.util.List; + +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapred.TaskID; +import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.PreemptionMessage; + +/** + * Policy encoding the {@link org.apache.hadoop.mapreduce.v2.app.MRAppMaster} + * response to preemption requests from the ResourceManager. + * @see org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator + */ +public interface AMPreemptionPolicy { + + public abstract class Context { + + /** + * @param container ID of container to preempt + * @return Task associated with the running container or null + * if no task is bound to that container. + */ + public abstract TaskAttemptId getTaskAttempt(ContainerId container); + + /** + * Method provides the complete list of containers running task of type t + * for this AM. + * @param t the type of containers + * @return a map containing + */ + public abstract List getContainers(TaskType t); + + } + + public void init(AppContext context); + + /** + * Callback informing the policy of ResourceManager. requests for resources + * to return to the cluster. The policy may take arbitrary action to satisfy + * requests by checkpointing task state, returning containers, or ignoring + * requests. The RM may elect to enforce these requests by forcibly killing + * containers not returned after some duration. + * @param context Handle to the current state of running containers + * @param preemptionRequests Request from RM for resources to return. + */ + public void preempt(Context context, PreemptionMessage preemptionRequests); + + /** + * This method is invoked by components interested to learn whether a certain + * task is being preempted. + * @param attemptID Task attempt to query + * @return true if this attempt is being preempted + */ + public boolean isPreempted(TaskAttemptId attemptID); + + /** + * This method is used to report to the policy that a certain task has been + * successfully preempted (for bookeeping, counters, etc..) + * @param attemptID Task attempt that preempted + */ + public void reportSuccessfulPreemption(TaskAttemptID attemptID); + + /** + * Callback informing the policy of containers exiting with a failure. This + * allows the policy to implemnt cleanup/compensating actions. + * @param attemptID Task attempt that failed + */ + public void handleFailedContainer(TaskAttemptId attemptID); + + /** + * Callback informing the policy of containers exiting cleanly. This is + * reported to the policy for bookeeping purposes. + * @param attemptID Task attempt that completed + */ + public void handleCompletedContainer(TaskAttemptId attemptID); + + /** + * Method to retrieve the latest checkpoint for a given {@link TaskID} + * @param taskId TaskID + * @return CheckpointID associated with this task or null + */ + public TaskCheckpointID getCheckpointID(TaskID taskId); + + /** + * Method to store the latest {@link + * org.apache.hadoop.mapreduce.checkpoint.CheckpointID} for a given {@link + * TaskID}. Assigning a null is akin to remove all previous checkpoints for + * this task. + * @param taskId TaskID + * @param cid Checkpoint to assign or null to remove it. + */ + public void setCheckpointID(TaskID taskId, TaskCheckpointID cid); + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java new file mode 100644 index 00000000000..100ef4f7af4 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/KillAMPreemptionPolicy.java @@ -0,0 +1,111 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.mapreduce.v2.app.rm.preemption; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapred.TaskID; +import org.apache.hadoop.mapreduce.JobCounter; +import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.PreemptionContainer; +import org.apache.hadoop.yarn.api.records.PreemptionMessage; +import org.apache.hadoop.yarn.event.EventHandler; + +/** + * Sample policy that aggressively kills tasks when requested. + */ +public class KillAMPreemptionPolicy implements AMPreemptionPolicy { + + private static final Log LOG = + LogFactory.getLog(KillAMPreemptionPolicy.class); + + @SuppressWarnings("rawtypes") + private EventHandler dispatcher = null; + + @Override + public void init(AppContext context) { + dispatcher = context.getEventHandler(); + } + + @Override + public void preempt(Context ctxt, PreemptionMessage preemptionRequests) { + // for both strict and negotiable preemption requests kill the + // container + for (PreemptionContainer c : + preemptionRequests.getStrictContract().getContainers()) { + killContainer(ctxt, c); + } + for (PreemptionContainer c : + preemptionRequests.getContract().getContainers()) { + killContainer(ctxt, c); + } + } + + @SuppressWarnings("unchecked") + private void killContainer(Context ctxt, PreemptionContainer c){ + ContainerId reqCont = c.getId(); + TaskAttemptId reqTask = ctxt.getTaskAttempt(reqCont); + LOG.info("Evicting " + reqTask); + dispatcher.handle(new TaskAttemptEvent(reqTask, + TaskAttemptEventType.TA_KILL)); + + // add preemption to counters + JobCounterUpdateEvent jce = new JobCounterUpdateEvent(reqTask + .getTaskId().getJobId()); + jce.addCounterUpdate(JobCounter.TASKS_REQ_PREEMPT, 1); + dispatcher.handle(jce); + } + + @Override + public void handleFailedContainer(TaskAttemptId attemptID) { + // ignore + } + + @Override + public boolean isPreempted(TaskAttemptId yarnAttemptID) { + return false; + } + + @Override + public void reportSuccessfulPreemption(TaskAttemptID taskAttemptID) { + // ignore + } + + @Override + public TaskCheckpointID getCheckpointID(TaskID taskId) { + return null; + } + + @Override + public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) { + // ignore + } + + @Override + public void handleCompletedContainer(TaskAttemptId attemptID) { + // ignore + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java new file mode 100644 index 00000000000..0c020aca22b --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/preemption/NoopAMPreemptionPolicy.java @@ -0,0 +1,72 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.mapreduce.v2.app.rm.preemption; + +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapred.TaskID; +import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.yarn.api.records.PreemptionMessage; + +/** + * NoOp policy that ignores all the requests for preemption. + */ +public class NoopAMPreemptionPolicy implements AMPreemptionPolicy { + + @Override + public void init(AppContext context){ + // do nothing + } + + @Override + public void preempt(Context ctxt, PreemptionMessage preemptionRequests) { + // do nothing, ignore all requeusts + } + + @Override + public void handleFailedContainer(TaskAttemptId attemptID) { + // do nothing + } + + @Override + public boolean isPreempted(TaskAttemptId yarnAttemptID) { + return false; + } + + @Override + public void reportSuccessfulPreemption(TaskAttemptID taskAttemptID) { + // ignore + } + + @Override + public TaskCheckpointID getCheckpointID(TaskID taskId) { + return null; + } + + @Override + public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) { + // ignore + } + + @Override + public void handleCompletedContainer(TaskAttemptId attemptID) { + // ignore + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index 506523db76b..ba8e3d30261 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -60,7 +60,7 @@ public class TestTaskAttemptListenerImpl { JobTokenSecretManager jobTokenSecretManager, RMHeartbeatHandler rmHeartbeatHandler, TaskHeartbeatHandler hbHandler) { - super(context, jobTokenSecretManager, rmHeartbeatHandler); + super(context, jobTokenSecretManager, rmHeartbeatHandler, null); this.taskHeartbeatHandler = hbHandler; } @@ -191,7 +191,7 @@ public class TestTaskAttemptListenerImpl { mock(RMHeartbeatHandler.class); final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); TaskAttemptListenerImpl listener = - new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) { + new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) { @Override protected void registerHeartbeatHandler(Configuration conf) { taskHeartbeatHandler = hbHandler; @@ -245,7 +245,7 @@ public class TestTaskAttemptListenerImpl { mock(RMHeartbeatHandler.class); final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class); TaskAttemptListenerImpl listener = - new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler) { + new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, null) { @Override protected void registerHeartbeatHandler(Configuration conf) { taskHeartbeatHandler = hbHandler; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index de573fe3007..7f698c7c035 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -79,6 +79,7 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetUtils; @@ -467,7 +468,8 @@ public class MRApp extends MRAppMaster { } @Override - protected TaskAttemptListener createTaskAttemptListener(AppContext context) { + protected TaskAttemptListener createTaskAttemptListener( + AppContext context, AMPreemptionPolicy policy) { return new TaskAttemptListener(){ @Override public InetSocketAddress getAddress() { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java index a6496d4e96e..baff0c069f4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java @@ -33,6 +33,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssigned import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -61,6 +63,8 @@ public class MRAppBenchmark { /** * Runs memory and time benchmark with Mock MRApp. + * @param app Application to submit + * @throws Exception On application failure */ public void run(MRApp app) throws Exception { Logger rootLogger = LogManager.getRootLogger(); @@ -133,6 +137,7 @@ public class MRAppBenchmark { protected void serviceStart() throws Exception { thread = new Thread(new Runnable() { @Override + @SuppressWarnings("unchecked") public void run() { ContainerAllocatorEvent event = null; while (!Thread.currentThread().isInterrupted()) { @@ -192,7 +197,9 @@ public class MRAppBenchmark { @Override protected ContainerAllocator createContainerAllocator( ClientService clientService, AppContext context) { - return new RMContainerAllocator(clientService, context) { + + AMPreemptionPolicy policy = new NoopAMPreemptionPolicy(); + return new RMContainerAllocator(clientService, context, policy) { @Override protected ApplicationMasterProtocol createSchedulerProxy() { return new ApplicationMasterProtocol() { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java index 5d5af9435c6..0fabb207f27 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java @@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -247,13 +248,14 @@ public class TestFail { super(maps, reduces, false, "TimeOutTaskMRApp", true); } @Override - protected TaskAttemptListener createTaskAttemptListener(AppContext context) { + protected TaskAttemptListener createTaskAttemptListener( + AppContext context, AMPreemptionPolicy policy) { //This will create the TaskAttemptListener with TaskHeartbeatHandler //RPC servers are not started //task time out is reduced //when attempt times out, heartbeat handler will send the lost event //leading to Attempt failure - return new TaskAttemptListenerImpl(getContext(), null, null) { + return new TaskAttemptListenerImpl(getContext(), null, null, policy) { @Override public void startRpcServer(){}; @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index 9a962364e92..3a6644e4349 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapreduce.v2.app; +import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy; + import static org.mockito.Matchers.anyFloat; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.isA; @@ -1428,14 +1430,15 @@ public class TestRMContainerAllocator { // Use this constructor when using a real job. MyContainerAllocator(MyResourceManager rm, ApplicationAttemptId appAttemptId, AppContext context) { - super(createMockClientService(), context); + super(createMockClientService(), context, new NoopAMPreemptionPolicy()); this.rm = rm; } // Use this constructor when you are using a mocked job. public MyContainerAllocator(MyResourceManager rm, Configuration conf, ApplicationAttemptId appAttemptId, Job job) { - super(createMockClientService(), createAppContext(appAttemptId, job)); + super(createMockClientService(), createAppContext(appAttemptId, job), + new NoopAMPreemptionPolicy()); this.rm = rm; super.init(conf); super.start(); @@ -1444,7 +1447,8 @@ public class TestRMContainerAllocator { public MyContainerAllocator(MyResourceManager rm, Configuration conf, ApplicationAttemptId appAttemptId, Job job, Clock clock) { super(createMockClientService(), - createAppContext(appAttemptId, job, clock)); + createAppContext(appAttemptId, job, clock), + new NoopAMPreemptionPolicy()); this.rm = rm; super.init(conf); super.start(); @@ -1671,7 +1675,8 @@ public class TestRMContainerAllocator { ApplicationId.newInstance(1, 1)); RMContainerAllocator allocator = new RMContainerAllocator( - mock(ClientService.class), appContext) { + mock(ClientService.class), appContext, + new NoopAMPreemptionPolicy()) { @Override protected void register() { } @@ -1721,7 +1726,8 @@ public class TestRMContainerAllocator { @Test public void testCompletedContainerEvent() { RMContainerAllocator allocator = new RMContainerAllocator( - mock(ClientService.class), mock(AppContext.class)); + mock(ClientService.class), mock(AppContext.class), + new NoopAMPreemptionPolicy()); TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId( MRBuilderUtils.newTaskId( diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java index 85e0267b041..f7a87d1ab88 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobCounter.java @@ -45,5 +45,9 @@ public enum JobCounter { TOTAL_LAUNCHED_UBERTASKS, NUM_UBER_SUBMAPS, NUM_UBER_SUBREDUCES, - NUM_FAILED_UBERTASKS + NUM_FAILED_UBERTASKS, + TASKS_REQ_PREEMPT, + CHECKPOINTS, + CHECKPOINT_BYTES, + CHECKPOINT_TIME } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 2622ec5da66..e696b865533 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -459,7 +459,13 @@ public interface MRJobConfig { public static final String MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = MR_AM_PREFIX + "job.reduce.preemption.limit"; public static final float DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = 0.5f; - + + /** + * Policy class encoding responses to preemption requests. + */ + public static final String MR_AM_PREEMPTION_POLICY = + MR_AM_PREFIX + "preemption.policy"; + /** AM ACL disabled. **/ public static final String JOB_AM_ACCESS_DISABLED = "mapreduce.job.am-access-disabled"; @@ -708,4 +714,7 @@ public interface MRJobConfig { public static final String MR_APPLICATION_TYPE = "MAPREDUCE"; + public static final String TASK_PREEMPTION = + "mapreduce.job.preemption"; + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/EnumCounter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/EnumCounter.java new file mode 100644 index 00000000000..d2ff26d6b4d --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/EnumCounter.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.checkpoint; + +public enum EnumCounter { + INPUTKEY, + INPUTVALUE, + OUTPUTRECORDS, + CHECKPOINT_BYTES, + CHECKPOINT_MS +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java new file mode 100644 index 00000000000..102b84f2483 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/checkpoint/TaskCheckpointID.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.checkpoint; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapred.Counters; + +/** + * Implementation of CheckpointID used in MR. It contains a reference to an + * underlying FileSsytem based checkpoint, and various metadata about the + * cost of checkpoints and other counters. This is sent by the task to the AM + * to be stored and provided to the next execution of the same task. + */ +public class TaskCheckpointID implements CheckpointID{ + + FSCheckpointID rawId; + private List partialOutput; + private Counters counters; + + public TaskCheckpointID() { + this.rawId = new FSCheckpointID(); + this.partialOutput = new ArrayList(); + } + + public TaskCheckpointID(FSCheckpointID rawId, List partialOutput, + Counters counters) { + this.rawId = rawId; + this.counters = counters; + if(partialOutput == null) + this.partialOutput = new ArrayList(); + else + this.partialOutput = partialOutput; + } + + @Override + public void write(DataOutput out) throws IOException { + counters.write(out); + if (partialOutput == null) { + WritableUtils.writeVLong(out, 0L); + } else { + WritableUtils.writeVLong(out, partialOutput.size()); + for(Path p:partialOutput){ + Text.writeString(out, p.toString()); + } + } + rawId.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + partialOutput.clear(); + counters.readFields(in); + long numPout = WritableUtils.readVLong(in); + for(int i=0;i getPartialCommittedOutput() { + return partialOutput; + } + + public Counters getCounters() { + return counters; + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties index 59dff060571..42539a097b2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/org/apache/hadoop/mapreduce/JobCounter.properties @@ -27,3 +27,7 @@ SLOTS_MILLIS_MAPS.name= Total time spent by all maps in occupied slot SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces in occupied slots (ms) FALLOW_SLOTS_MILLIS_MAPS.name= Total time spent by all maps waiting after reserving slots (ms) FALLOW_SLOTS_MILLIS_REDUCES.name= Total time spent by all reduces waiting after reserving slots (ms) +TASKS_REQ_PREEMPT.name= Tasks that have been asked to preempt +CHECKPOINTS.name= Number of checkpoints reported +CHECKPOINT_BYTES.name= Total amount of bytes in checkpoints +CHECKPOINT_TIME.name= Total time spent checkpointing (ms) \ No newline at end of file