svn merge -c 1401738 FIXES: MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown. Contributed by Vinod Kumar Vavilapalli

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1401741 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2012-10-24 15:47:01 +00:00
parent b702473f5c
commit 33cb0be07c
5 changed files with 40 additions and 10 deletions

View File

@ -460,6 +460,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4229. Intern counter names in the JT (Miomir Boljanovic and bobby via daryn) MAPREDUCE-4229. Intern counter names in the JT (Miomir Boljanovic and bobby via daryn)
MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown.
(Vinod Kumar Vavilapalli via jlowe)
Release 0.23.4 - UNRELEASED Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -30,6 +30,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -81,6 +82,7 @@ public class ContainerLauncherImpl extends AbstractService implements
protected BlockingQueue<ContainerLauncherEvent> eventQueue = protected BlockingQueue<ContainerLauncherEvent> eventQueue =
new LinkedBlockingQueue<ContainerLauncherEvent>(); new LinkedBlockingQueue<ContainerLauncherEvent>();
YarnRPC rpc; YarnRPC rpc;
private final AtomicBoolean stopped;
private Container getContainer(ContainerLauncherEvent event) { private Container getContainer(ContainerLauncherEvent event) {
ContainerId id = event.getContainerID(); ContainerId id = event.getContainerID();
@ -237,6 +239,7 @@ public class ContainerLauncherImpl extends AbstractService implements
public ContainerLauncherImpl(AppContext context) { public ContainerLauncherImpl(AppContext context) {
super(ContainerLauncherImpl.class.getName()); super(ContainerLauncherImpl.class.getName());
this.context = context; this.context = context;
this.stopped = new AtomicBoolean(false);
} }
@Override @Override
@ -271,11 +274,13 @@ public class ContainerLauncherImpl extends AbstractService implements
@Override @Override
public void run() { public void run() {
ContainerLauncherEvent event = null; ContainerLauncherEvent event = null;
while (!Thread.currentThread().isInterrupted()) { while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try { try {
event = eventQueue.take(); event = eventQueue.take();
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.error("Returning, interrupted : " + e); if (!stopped.get()) {
LOG.error("Returning, interrupted : " + e);
}
return; return;
} }
int poolSize = launcherPool.getCorePoolSize(); int poolSize = launcherPool.getCorePoolSize();
@ -324,6 +329,10 @@ public class ContainerLauncherImpl extends AbstractService implements
} }
public void stop() { public void stop() {
if (stopped.getAndSet(true)) {
// return if already stopped
return;
}
// shutdown any containers that might be left running // shutdown any containers that might be left running
shutdownAllContainers(); shutdownAllContainers();
eventHandlingThread.interrupt(); eventHandlingThread.interrupt();

View File

@ -67,7 +67,7 @@ public abstract class RMCommunicator extends AbstractService {
private int rmPollInterval;//millis private int rmPollInterval;//millis
protected ApplicationId applicationId; protected ApplicationId applicationId;
protected ApplicationAttemptId applicationAttemptId; protected ApplicationAttemptId applicationAttemptId;
private AtomicBoolean stopped; private final AtomicBoolean stopped;
protected Thread allocatorThread; protected Thread allocatorThread;
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
protected EventHandler eventHandler; protected EventHandler eventHandler;
@ -239,7 +239,9 @@ public abstract class RMCommunicator extends AbstractService {
// TODO: for other exceptions // TODO: for other exceptions
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.warn("Allocated thread interrupted. Returning."); if (!stopped.get()) {
LOG.warn("Allocated thread interrupted. Returning.");
}
return; return;
} }
} }

View File

@ -32,6 +32,7 @@ import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -84,7 +85,7 @@ public class RMContainerAllocator extends RMContainerRequestor
private static final Priority PRIORITY_MAP; private static final Priority PRIORITY_MAP;
private Thread eventHandlingThread; private Thread eventHandlingThread;
private volatile boolean stopEventHandling; private final AtomicBoolean stopped;
static { static {
PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class); PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
@ -145,6 +146,7 @@ public class RMContainerAllocator extends RMContainerRequestor
public RMContainerAllocator(ClientService clientService, AppContext context) { public RMContainerAllocator(ClientService clientService, AppContext context) {
super(clientService, context); super(clientService, context);
this.stopped = new AtomicBoolean(false);
} }
@Override @Override
@ -176,11 +178,13 @@ public class RMContainerAllocator extends RMContainerRequestor
ContainerAllocatorEvent event; ContainerAllocatorEvent event;
while (!stopEventHandling && !Thread.currentThread().isInterrupted()) { while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try { try {
event = RMContainerAllocator.this.eventQueue.take(); event = RMContainerAllocator.this.eventQueue.take();
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.error("Returning, interrupted : " + e); if (!stopped.get()) {
LOG.error("Returning, interrupted : " + e);
}
return; return;
} }
@ -234,7 +238,10 @@ public class RMContainerAllocator extends RMContainerRequestor
@Override @Override
public void stop() { public void stop() {
this.stopEventHandling = true; if (stopped.getAndSet(true)) {
// return if already stopped
return;
}
eventHandlingThread.interrupt(); eventHandlingThread.interrupt();
super.stop(); super.stop();
LOG.info("Final Stats: " + getStat()); LOG.info("Final Stats: " + getStat());

View File

@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -43,10 +44,12 @@ public class TaskCleanerImpl extends AbstractService implements TaskCleaner {
private Thread eventHandlingThread; private Thread eventHandlingThread;
private BlockingQueue<TaskCleanupEvent> eventQueue = private BlockingQueue<TaskCleanupEvent> eventQueue =
new LinkedBlockingQueue<TaskCleanupEvent>(); new LinkedBlockingQueue<TaskCleanupEvent>();
private final AtomicBoolean stopped;
public TaskCleanerImpl(AppContext context) { public TaskCleanerImpl(AppContext context) {
super("TaskCleaner"); super("TaskCleaner");
this.context = context; this.context = context;
this.stopped = new AtomicBoolean(false);
} }
public void start() { public void start() {
@ -59,11 +62,13 @@ public class TaskCleanerImpl extends AbstractService implements TaskCleaner {
@Override @Override
public void run() { public void run() {
TaskCleanupEvent event = null; TaskCleanupEvent event = null;
while (!Thread.currentThread().isInterrupted()) { while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try { try {
event = eventQueue.take(); event = eventQueue.take();
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.error("Returning, interrupted : " + e); if (!stopped.get()) {
LOG.error("Returning, interrupted : " + e);
}
return; return;
} }
// the events from the queue are handled in parallel // the events from the queue are handled in parallel
@ -77,6 +82,10 @@ public class TaskCleanerImpl extends AbstractService implements TaskCleaner {
} }
public void stop() { public void stop() {
if (stopped.getAndSet(true)) {
// return if already stopped
return;
}
eventHandlingThread.interrupt(); eventHandlingThread.interrupt();
launcherPool.shutdown(); launcherPool.shutdown();
super.stop(); super.stop();