YARN-3878. AsyncDispatcher can hang while stopping if it is configured for draining events on stop. (Varun Saxena via kasha)

This commit is contained in:
Sangjin Lee 2015-11-23 14:14:50 -08:00
parent 16e3dc24c5
commit a9001a210a
4 changed files with 85 additions and 1 deletions

View File

@ -33,6 +33,9 @@ Release 2.6.3 - UNRELEASED
YARN-3925. ContainerLogsUtils#getContainerLogFile fails to read container
log files from full disks. (zhihai xu via jlowe)
YARN-3878. AsyncDispatcher can hang while stopping if it is configured for
draining events on stop. (Varun Saxena via kasha)
Release 2.6.2 - 2015-10-28
INCOMPATIBLE CHANGES

View File

@ -35,6 +35,8 @@ import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import com.google.common.annotations.VisibleForTesting;
/**
* Dispatches {@link Event}s in a separate thread. Currently only single thread
* does that. Potentially there could be multiple channels for each event type
@ -250,11 +252,19 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", e);
}
// Need to reset drained flag to true if event queue is empty,
// otherwise dispatcher will hang on stop.
drained = eventQueue.isEmpty();
throw new YarnRuntimeException(e);
}
};
}
@VisibleForTesting
protected boolean isEventThreadWaiting() {
return eventHandlingThread.getState() == Thread.State.WAITING;
}
/**
* Multiplexing an event. Sending it to different handlers that
* are interested in the event.

View File

@ -34,7 +34,7 @@ public class DrainDispatcher extends AsyncDispatcher {
this(new LinkedBlockingQueue<Event>());
}
private DrainDispatcher(BlockingQueue<Event> eventQueue) {
public DrainDispatcher(BlockingQueue<Event> eventQueue) {
super(eventQueue);
this.queue = eventQueue;
this.mutex = this;
@ -49,6 +49,15 @@ public class DrainDispatcher extends AsyncDispatcher {
}
}
/**
* Wait till event thread enters WAITING state (i.e. waiting for new events).
*/
public void waitForEventThreadToWait() {
while (!isEventThreadWaiting()) {
Thread.yield();
}
}
@Override
Runnable createThread() {
return new Runnable() {

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.event;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Assert;
import org.junit.Test;
public class TestAsyncDispatcher {
/* This test checks whether dispatcher hangs on close if following two things
* happen :
* 1. A thread which was putting event to event queue is interrupted.
* 2. Event queue is empty on close.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test(timeout=10000)
public void testDispatcherOnCloseIfQueueEmpty() throws Exception {
BlockingQueue<Event> eventQueue = spy(new LinkedBlockingQueue<Event>());
Event event = mock(Event.class);
doThrow(new InterruptedException()).when(eventQueue).put(event);
DrainDispatcher disp = new DrainDispatcher(eventQueue);
disp.init(new Configuration());
disp.setDrainEventsOnStop();
disp.start();
// Wait for event handler thread to start and begin waiting for events.
disp.waitForEventThreadToWait();
try {
disp.getEventHandler().handle(event);
} catch (YarnRuntimeException e) {
}
// Queue should be empty and dispatcher should not hang on close
Assert.assertTrue("Event Queue should have been empty",
eventQueue.isEmpty());
disp.close();
}
}