YARN-3878. AsyncDispatcher can hang while stopping if it is configured for draining events on stop. (Varun Saxena via kasha)

This commit is contained in:
Karthik Kambatla 2015-07-09 09:48:29 -07:00
parent 527c40e4d6
commit aa067c6aa4
4 changed files with 87 additions and 14 deletions

View File

@ -624,6 +624,9 @@ Release 2.7.2 - UNRELEASED
YARN-3690. [JDK8] 'mvn site' fails. (Brahma Reddy Battula via aajisaka) YARN-3690. [JDK8] 'mvn site' fails. (Brahma Reddy Battula via aajisaka)
YARN-3878. AsyncDispatcher can hang while stopping if it is configured for
draining events on stop. (Varun Saxena via kasha)
Release 2.7.1 - 2015-07-06 Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -55,9 +55,6 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
// stop functionality. // stop functionality.
private volatile boolean drainEventsOnStop = false; private volatile boolean drainEventsOnStop = false;
// Indicates all the remaining dispatcher's events on stop have been drained
// and processed.
private volatile boolean drained = true;
private Object waitForDrained = new Object(); private Object waitForDrained = new Object();
// For drainEventsOnStop enabled only, block newly coming events into the // For drainEventsOnStop enabled only, block newly coming events into the
@ -84,13 +81,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
@Override @Override
public void run() { public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) { while (!stopped && !Thread.currentThread().isInterrupted()) {
drained = eventQueue.isEmpty();
// blockNewEvents is only set when dispatcher is draining to stop, // blockNewEvents is only set when dispatcher is draining to stop,
// adding this check is to avoid the overhead of acquiring the lock // adding this check is to avoid the overhead of acquiring the lock
// and calling notify every time in the normal run of the loop. // and calling notify every time in the normal run of the loop.
if (blockNewEvents) { if (blockNewEvents) {
synchronized (waitForDrained) { synchronized (waitForDrained) {
if (drained) { if (eventQueue.isEmpty()) {
waitForDrained.notify(); waitForDrained.notify();
} }
} }
@ -139,7 +135,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
blockNewEvents = true; blockNewEvents = true;
LOG.info("AsyncDispatcher is draining to stop, igonring any new events."); LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
synchronized (waitForDrained) { synchronized (waitForDrained) {
while (!drained && eventHandlingThread.isAlive()) { while (!eventQueue.isEmpty() && eventHandlingThread.isAlive()) {
waitForDrained.wait(1000); waitForDrained.wait(1000);
LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" + LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
eventHandlingThread.getState()); eventHandlingThread.getState());
@ -223,12 +219,21 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
return handlerInstance; return handlerInstance;
} }
@VisibleForTesting
protected boolean hasPendingEvents() {
return !eventQueue.isEmpty();
}
@VisibleForTesting
protected boolean isEventThreadWaiting() {
return eventHandlingThread.getState() == Thread.State.WAITING;
}
class GenericEventHandler implements EventHandler<Event> { class GenericEventHandler implements EventHandler<Event> {
public void handle(Event event) { public void handle(Event event) {
if (blockNewEvents) { if (blockNewEvents) {
return; return;
} }
drained = false;
/* all this method does is enqueue all the events onto the queue */ /* all this method does is enqueue all the events onto the queue */
int qSize = eventQueue.size(); int qSize = eventQueue.size();
@ -285,9 +290,4 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
} }
}; };
} }
@VisibleForTesting
protected boolean isDrained() {
return this.drained;
}
} }

View File

@ -27,15 +27,24 @@ public class DrainDispatcher extends AsyncDispatcher {
this(new LinkedBlockingQueue<Event>()); this(new LinkedBlockingQueue<Event>());
} }
private DrainDispatcher(BlockingQueue<Event> eventQueue) { public DrainDispatcher(BlockingQueue<Event> eventQueue) {
super(eventQueue); super(eventQueue);
} }
/**
* Wait till event thread enters WAITING state (i.e. waiting for new events).
*/
public void waitForEventThreadToWait() {
while (!isEventThreadWaiting()) {
Thread.yield();
}
}
/** /**
* Busy loop waiting for all queued events to drain. * Busy loop waiting for all queued events to drain.
*/ */
public void await() { public void await() {
while (!isDrained()) { while (hasPendingEvents()) {
Thread.yield(); Thread.yield();
} }
} }

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.event;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Assert;
import org.junit.Test;
public class TestAsyncDispatcher {
/* This test checks whether dispatcher hangs on close if following two things
* happen :
* 1. A thread which was putting event to event queue is interrupted.
* 2. Event queue is empty on close.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test(timeout=10000)
public void testDispatcherOnCloseIfQueueEmpty() throws Exception {
BlockingQueue<Event> eventQueue = spy(new LinkedBlockingQueue<Event>());
Event event = mock(Event.class);
doThrow(new InterruptedException()).when(eventQueue).put(event);
DrainDispatcher disp = new DrainDispatcher(eventQueue);
disp.init(new Configuration());
disp.setDrainEventsOnStop();
disp.start();
// Wait for event handler thread to start and begin waiting for events.
disp.waitForEventThreadToWait();
try {
disp.getEventHandler().handle(event);
} catch (YarnRuntimeException e) {
}
// Queue should be empty and dispatcher should not hang on close
Assert.assertTrue("Event Queue should have been empty",
eventQueue.isEmpty());
disp.close();
}
}