From 1e45b1f1fd38543b0b1233f57fdee1ac4a365332 Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Wed, 24 Oct 2012 15:45:35 +0000 Subject: [PATCH] MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown. Contributed by Vinod Kumar Vavilapalli git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1401738 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 +++ .../v2/app/launcher/ContainerLauncherImpl.java | 13 +++++++++++-- .../mapreduce/v2/app/rm/RMCommunicator.java | 6 ++++-- .../mapreduce/v2/app/rm/RMContainerAllocator.java | 15 +++++++++++---- .../v2/app/taskclean/TaskCleanerImpl.java | 13 +++++++++++-- 5 files changed, 40 insertions(+), 10 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index cb44038f5a5..9b7828d6569 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -607,6 +607,9 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4229. Intern counter names in the JT (Miomir Boljanovic and bobby via daryn) + MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown. + (Vinod Kumar Vavilapalli via jlowe) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index 3144ab179c9..fa97d692ee3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -30,6 +30,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -81,6 +82,7 @@ public class ContainerLauncherImpl extends AbstractService implements protected BlockingQueue eventQueue = new LinkedBlockingQueue(); YarnRPC rpc; + private final AtomicBoolean stopped; private Container getContainer(ContainerLauncherEvent event) { ContainerId id = event.getContainerID(); @@ -237,6 +239,7 @@ public class ContainerLauncherImpl extends AbstractService implements public ContainerLauncherImpl(AppContext context) { super(ContainerLauncherImpl.class.getName()); this.context = context; + this.stopped = new AtomicBoolean(false); } @Override @@ -271,11 +274,13 @@ public class ContainerLauncherImpl extends AbstractService implements @Override public void run() { ContainerLauncherEvent event = null; - while (!Thread.currentThread().isInterrupted()) { + while (!stopped.get() && !Thread.currentThread().isInterrupted()) { try { event = eventQueue.take(); } catch (InterruptedException e) { - LOG.error("Returning, interrupted : " + e); + if (!stopped.get()) { + LOG.error("Returning, interrupted : " + e); + } return; } int poolSize = launcherPool.getCorePoolSize(); @@ -324,6 +329,10 @@ public class ContainerLauncherImpl extends AbstractService implements } public void stop() { + if (stopped.getAndSet(true)) { + // return if already stopped + return; + } // shutdown any containers that might be left running shutdownAllContainers(); eventHandlingThread.interrupt(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 63e92467ea9..9f594c09e86 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -67,7 +67,7 @@ public abstract class RMCommunicator extends AbstractService { private int rmPollInterval;//millis protected ApplicationId applicationId; protected ApplicationAttemptId applicationAttemptId; - private AtomicBoolean stopped; + private final AtomicBoolean stopped; protected Thread allocatorThread; @SuppressWarnings("rawtypes") protected EventHandler eventHandler; @@ -239,7 +239,9 @@ public abstract class RMCommunicator extends AbstractService { // TODO: for other exceptions } } catch (InterruptedException e) { - LOG.warn("Allocated thread interrupted. Returning."); + if (!stopped.get()) { + LOG.warn("Allocated thread interrupted. Returning."); + } return; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index f8ebfcfc6d2..fd8fa960762 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -32,6 +32,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -84,7 +85,7 @@ public class RMContainerAllocator extends RMContainerRequestor private static final Priority PRIORITY_MAP; private Thread eventHandlingThread; - private volatile boolean stopEventHandling; + private final AtomicBoolean stopped; static { PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class); @@ -145,6 +146,7 @@ public class RMContainerAllocator extends RMContainerRequestor public RMContainerAllocator(ClientService clientService, AppContext context) { super(clientService, context); + this.stopped = new AtomicBoolean(false); } @Override @@ -176,11 +178,13 @@ public class RMContainerAllocator extends RMContainerRequestor ContainerAllocatorEvent event; - while (!stopEventHandling && !Thread.currentThread().isInterrupted()) { + while (!stopped.get() && !Thread.currentThread().isInterrupted()) { try { event = RMContainerAllocator.this.eventQueue.take(); } catch (InterruptedException e) { - LOG.error("Returning, interrupted : " + e); + if (!stopped.get()) { + LOG.error("Returning, interrupted : " + e); + } return; } @@ -234,7 +238,10 @@ public class RMContainerAllocator extends RMContainerRequestor @Override public void stop() { - this.stopEventHandling = true; + if (stopped.getAndSet(true)) { + // return if already stopped + return; + } eventHandlingThread.interrupt(); super.stop(); LOG.info("Final Stats: " + getStat()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java index 8ee1d6e2eae..6c0418ebfdd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java @@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,10 +44,12 @@ public class TaskCleanerImpl extends AbstractService implements TaskCleaner { private Thread eventHandlingThread; private BlockingQueue eventQueue = new LinkedBlockingQueue(); + private final AtomicBoolean stopped; public TaskCleanerImpl(AppContext context) { super("TaskCleaner"); this.context = context; + this.stopped = new AtomicBoolean(false); } public void start() { @@ -59,11 +62,13 @@ public class TaskCleanerImpl extends AbstractService implements TaskCleaner { @Override public void run() { TaskCleanupEvent event = null; - while (!Thread.currentThread().isInterrupted()) { + while (!stopped.get() && !Thread.currentThread().isInterrupted()) { try { event = eventQueue.take(); } catch (InterruptedException e) { - LOG.error("Returning, interrupted : " + e); + if (!stopped.get()) { + LOG.error("Returning, interrupted : " + e); + } return; } // the events from the queue are handled in parallel @@ -77,6 +82,10 @@ public class TaskCleanerImpl extends AbstractService implements TaskCleaner { } public void stop() { + if (stopped.getAndSet(true)) { + // return if already stopped + return; + } eventHandlingThread.interrupt(); launcherPool.shutdown(); super.stop();