diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 97e3b5b24fc..fb2993811a8 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -33,6 +33,9 @@ Release 2.6.3 - UNRELEASED YARN-3925. ContainerLogsUtils#getContainerLogFile fails to read container log files from full disks. (zhihai xu via jlowe) + YARN-3878. AsyncDispatcher can hang while stopping if it is configured for + draining events on stop. (Varun Saxena via kasha) + Release 2.6.2 - 2015-10-28 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 0a869fdaa8f..b7be2559bdc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -35,6 +35,8 @@ import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import com.google.common.annotations.VisibleForTesting; + /** * Dispatches {@link Event}s in a separate thread. Currently only single thread * does that. Potentially there could be multiple channels for each event type @@ -250,11 +252,19 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { if (!stopped) { LOG.warn("AsyncDispatcher thread interrupted", e); } + // Need to reset drained flag to true if event queue is empty, + // otherwise dispatcher will hang on stop. + drained = eventQueue.isEmpty(); throw new YarnRuntimeException(e); } }; } + @VisibleForTesting + protected boolean isEventThreadWaiting() { + return eventHandlingThread.getState() == Thread.State.WAITING; + } + /** * Multiplexing an event. Sending it to different handlers that * are interested in the event. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index 803b2bb2b3b..a48d6e9ea41 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -34,7 +34,7 @@ public class DrainDispatcher extends AsyncDispatcher { this(new LinkedBlockingQueue()); } - private DrainDispatcher(BlockingQueue eventQueue) { + public DrainDispatcher(BlockingQueue eventQueue) { super(eventQueue); this.queue = eventQueue; this.mutex = this; @@ -49,6 +49,15 @@ public class DrainDispatcher extends AsyncDispatcher { } } + /** + * Wait till event thread enters WAITING state (i.e. waiting for new events). + */ + public void waitForEventThreadToWait() { + while (!isEventThreadWaiting()) { + Thread.yield(); + } + } + @Override Runnable createThread() { return new Runnable() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java new file mode 100644 index 00000000000..b5fd9236d92 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.event; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.junit.Assert; +import org.junit.Test; + +public class TestAsyncDispatcher { + + /* This test checks whether dispatcher hangs on close if following two things + * happen : + * 1. A thread which was putting event to event queue is interrupted. + * 2. Event queue is empty on close. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test(timeout=10000) + public void testDispatcherOnCloseIfQueueEmpty() throws Exception { + BlockingQueue eventQueue = spy(new LinkedBlockingQueue()); + Event event = mock(Event.class); + doThrow(new InterruptedException()).when(eventQueue).put(event); + DrainDispatcher disp = new DrainDispatcher(eventQueue); + disp.init(new Configuration()); + disp.setDrainEventsOnStop(); + disp.start(); + // Wait for event handler thread to start and begin waiting for events. + disp.waitForEventThreadToWait(); + try { + disp.getEventHandler().handle(event); + } catch (YarnRuntimeException e) { + } + // Queue should be empty and dispatcher should not hang on close + Assert.assertTrue("Event Queue should have been empty", + eventQueue.isEmpty()); + disp.close(); + } +} +