YARN-3878. AsyncDispatcher can hang while stopping if it is configured for draining events on stop. (Varun Saxena via kasha)

(cherry picked from commit aa067c6aa4)
(cherry picked from commit ccf18705f7)
This commit is contained in:
Karthik Kambatla 2015-07-09 09:48:29 -07:00
parent 7e5f73a952
commit 8bb8006b71
4 changed files with 87 additions and 14 deletions

View File

@ -20,6 +20,9 @@ Release 2.7.2 - UNRELEASED
YARN-3690. [JDK8] 'mvn site' fails. (Brahma Reddy Battula via aajisaka)
YARN-3878. AsyncDispatcher can hang while stopping if it is configured for
draining events on stop. (Varun Saxena via kasha)
Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES

View File

@ -55,9 +55,6 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
// stop functionality.
private volatile boolean drainEventsOnStop = false;
// Indicates all the remaining dispatcher's events on stop have been drained
// and processed.
private volatile boolean drained = true;
private Object waitForDrained = new Object();
// For drainEventsOnStop enabled only, block newly coming events into the
@ -84,13 +81,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
drained = eventQueue.isEmpty();
// blockNewEvents is only set when dispatcher is draining to stop,
// adding this check is to avoid the overhead of acquiring the lock
// and calling notify every time in the normal run of the loop.
if (blockNewEvents) {
synchronized (waitForDrained) {
if (drained) {
if (eventQueue.isEmpty()) {
waitForDrained.notify();
}
}
@ -139,7 +135,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
blockNewEvents = true;
LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
synchronized (waitForDrained) {
while (!drained && eventHandlingThread.isAlive()) {
while (!eventQueue.isEmpty() && eventHandlingThread.isAlive()) {
waitForDrained.wait(1000);
LOG.info("Waiting for AsyncDispatcher to drain.");
}
@ -222,12 +218,21 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
return handlerInstance;
}
@VisibleForTesting
protected boolean hasPendingEvents() {
return !eventQueue.isEmpty();
}
@VisibleForTesting
protected boolean isEventThreadWaiting() {
return eventHandlingThread.getState() == Thread.State.WAITING;
}
class GenericEventHandler implements EventHandler<Event> {
public void handle(Event event) {
if (blockNewEvents) {
return;
}
drained = false;
/* all this method does is enqueue all the events onto the queue */
int qSize = eventQueue.size();
@ -284,9 +289,4 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
}
};
}
@VisibleForTesting
protected boolean isDrained() {
return this.drained;
}
}

View File

@ -27,15 +27,24 @@ public class DrainDispatcher extends AsyncDispatcher {
this(new LinkedBlockingQueue<Event>());
}
private DrainDispatcher(BlockingQueue<Event> eventQueue) {
public DrainDispatcher(BlockingQueue<Event> eventQueue) {
super(eventQueue);
}
/**
* Wait till event thread enters WAITING state (i.e. waiting for new events).
*/
public void waitForEventThreadToWait() {
while (!isEventThreadWaiting()) {
Thread.yield();
}
}
/**
* Busy loop waiting for all queued events to drain.
*/
public void await() {
while (!isDrained()) {
while (hasPendingEvents()) {
Thread.yield();
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.event;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Assert;
import org.junit.Test;
public class TestAsyncDispatcher {
/* This test checks whether dispatcher hangs on close if following two things
* happen :
* 1. A thread which was putting event to event queue is interrupted.
* 2. Event queue is empty on close.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test(timeout=10000)
public void testDispatcherOnCloseIfQueueEmpty() throws Exception {
BlockingQueue<Event> eventQueue = spy(new LinkedBlockingQueue<Event>());
Event event = mock(Event.class);
doThrow(new InterruptedException()).when(eventQueue).put(event);
DrainDispatcher disp = new DrainDispatcher(eventQueue);
disp.init(new Configuration());
disp.setDrainEventsOnStop();
disp.start();
// Wait for event handler thread to start and begin waiting for events.
disp.waitForEventThreadToWait();
try {
disp.getEventHandler().handle(event);
} catch (YarnRuntimeException e) {
}
// Queue should be empty and dispatcher should not hang on close
Assert.assertTrue("Event Queue should have been empty",
eventQueue.isEmpty());
disp.close();
}
}