From 9aa4411325801e706d71a1fca81df110157d1e99 Mon Sep 17 00:00:00 2001 From: Jian He Date: Mon, 13 Jul 2015 14:30:35 -0700 Subject: [PATCH] Revert "YARN-3878. AsyncDispatcher can hang while stopping if it is configured for draining events on stop. (Varun Saxena via kasha)" This reverts commit aa067c6aa47b4c79577096817acc00ad6421180c. (cherry picked from commit 2466460d4cd13ad5837c044476b26e63082c1d37) --- hadoop-yarn-project/CHANGES.txt | 3 - .../hadoop/yarn/event/AsyncDispatcher.java | 24 ++++---- .../hadoop/yarn/event/DrainDispatcher.java | 13 +--- .../yarn/event/TestAsyncDispatcher.java | 61 ------------------- 4 files changed, 14 insertions(+), 87 deletions(-) delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index edb6f7bb417..668ee793b69 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -20,9 +20,6 @@ Release 2.7.2 - UNRELEASED YARN-3690. [JDK8] 'mvn site' fails. (Brahma Reddy Battula via aajisaka) - YARN-3878. AsyncDispatcher can hang while stopping if it is configured for - draining events on stop. (Varun Saxena via kasha) - Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 38350f85293..d36d841772a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -55,6 +55,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { // stop functionality. private volatile boolean drainEventsOnStop = false; + // Indicates all the remaining dispatcher's events on stop have been drained + // and processed. + private volatile boolean drained = true; private Object waitForDrained = new Object(); // For drainEventsOnStop enabled only, block newly coming events into the @@ -81,12 +84,13 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { @Override public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { + drained = eventQueue.isEmpty(); // blockNewEvents is only set when dispatcher is draining to stop, // adding this check is to avoid the overhead of acquiring the lock // and calling notify every time in the normal run of the loop. if (blockNewEvents) { synchronized (waitForDrained) { - if (eventQueue.isEmpty()) { + if (drained) { waitForDrained.notify(); } } @@ -135,7 +139,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { blockNewEvents = true; LOG.info("AsyncDispatcher is draining to stop, igonring any new events."); synchronized (waitForDrained) { - while (!eventQueue.isEmpty() && eventHandlingThread.isAlive()) { + while (!drained && eventHandlingThread.isAlive()) { waitForDrained.wait(1000); LOG.info("Waiting for AsyncDispatcher to drain."); } @@ -218,21 +222,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { return handlerInstance; } - @VisibleForTesting - protected boolean hasPendingEvents() { - return !eventQueue.isEmpty(); - } - - @VisibleForTesting - protected boolean isEventThreadWaiting() { - return eventHandlingThread.getState() == Thread.State.WAITING; - } - class GenericEventHandler implements EventHandler { public void handle(Event event) { if (blockNewEvents) { return; } + drained = false; /* all this method does is enqueue all the events onto the queue */ int qSize = eventQueue.size(); @@ -289,4 +284,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { } }; } + + @VisibleForTesting + protected boolean isDrained() { + return this.drained; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index d1f4fe9eb88..da5ae443ae0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -27,24 +27,15 @@ public class DrainDispatcher extends AsyncDispatcher { this(new LinkedBlockingQueue()); } - public DrainDispatcher(BlockingQueue eventQueue) { + private DrainDispatcher(BlockingQueue eventQueue) { super(eventQueue); } - /** - * Wait till event thread enters WAITING state (i.e. waiting for new events). - */ - public void waitForEventThreadToWait() { - while (!isEventThreadWaiting()) { - Thread.yield(); - } - } - /** * Busy loop waiting for all queued events to drain. */ public void await() { - while (hasPendingEvents()) { + while (!isDrained()) { Thread.yield(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java deleted file mode 100644 index ee17ddd66f9..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.event; - -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.junit.Assert; -import org.junit.Test; - -public class TestAsyncDispatcher { - - /* This test checks whether dispatcher hangs on close if following two things - * happen : - * 1. A thread which was putting event to event queue is interrupted. - * 2. Event queue is empty on close. - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Test(timeout=10000) - public void testDispatcherOnCloseIfQueueEmpty() throws Exception { - BlockingQueue eventQueue = spy(new LinkedBlockingQueue()); - Event event = mock(Event.class); - doThrow(new InterruptedException()).when(eventQueue).put(event); - DrainDispatcher disp = new DrainDispatcher(eventQueue); - disp.init(new Configuration()); - disp.setDrainEventsOnStop(); - disp.start(); - // Wait for event handler thread to start and begin waiting for events. - disp.waitForEventThreadToWait(); - try { - disp.getEventHandler().handle(event); - } catch (YarnRuntimeException e) { - } - // Queue should be empty and dispatcher should not hang on close - Assert.assertTrue("Event Queue should have been empty", - eventQueue.isEmpty()); - disp.close(); - } -}