YARN-3878. AsyncDispatcher can hang while stopping if it is configured for draining events on stop. Contributed by Varun Saxena

(cherry picked from commit 393fe71771)
This commit is contained in:
Jian He 2015-07-21 15:05:41 -07:00
parent 3e793224f9
commit 65af0dffa5
4 changed files with 83 additions and 1 deletions

View File

@ -29,6 +29,9 @@ Release 2.7.2 - UNRELEASED
YARN-3535. Scheduler must re-request container resources when RMContainer transitions YARN-3535. Scheduler must re-request container resources when RMContainer transitions
from ALLOCATED to KILLED (rohithsharma and peng.zhang via asuresh) from ALLOCATED to KILLED (rohithsharma and peng.zhang via asuresh)
YARN-3878. AsyncDispatcher can hang while stopping if it is configured for
draining events on stop. (Varun Saxena via jianhe)
Release 2.7.1 - 2015-07-06 Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -245,6 +245,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
if (!stopped) { if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", e); LOG.warn("AsyncDispatcher thread interrupted", e);
} }
// Need to reset drained flag to true if event queue is empty,
// otherwise dispatcher will hang on stop.
drained = eventQueue.isEmpty();
throw new YarnRuntimeException(e); throw new YarnRuntimeException(e);
} }
}; };
@ -285,6 +288,11 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
}; };
} }
@VisibleForTesting
protected boolean isEventThreadWaiting() {
return eventHandlingThread.getState() == Thread.State.WAITING;
}
@VisibleForTesting @VisibleForTesting
protected boolean isDrained() { protected boolean isDrained() {
return this.drained; return this.drained;

View File

@ -27,10 +27,19 @@ public class DrainDispatcher extends AsyncDispatcher {
this(new LinkedBlockingQueue<Event>()); this(new LinkedBlockingQueue<Event>());
} }
private DrainDispatcher(BlockingQueue<Event> eventQueue) { public DrainDispatcher(BlockingQueue<Event> eventQueue) {
super(eventQueue); super(eventQueue);
} }
/**
* Wait till event thread enters WAITING state (i.e. waiting for new events).
*/
public void waitForEventThreadToWait() {
while (!isEventThreadWaiting()) {
Thread.yield();
}
}
/** /**
* Busy loop waiting for all queued events to drain. * Busy loop waiting for all queued events to drain.
*/ */

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.event;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Assert;
import org.junit.Test;
public class TestAsyncDispatcher {
/* This test checks whether dispatcher hangs on close if following two things
* happen :
* 1. A thread which was putting event to event queue is interrupted.
* 2. Event queue is empty on close.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test(timeout=10000)
public void testDispatcherOnCloseIfQueueEmpty() throws Exception {
BlockingQueue<Event> eventQueue = spy(new LinkedBlockingQueue<Event>());
Event event = mock(Event.class);
doThrow(new InterruptedException()).when(eventQueue).put(event);
DrainDispatcher disp = new DrainDispatcher(eventQueue);
disp.init(new Configuration());
disp.setDrainEventsOnStop();
disp.start();
// Wait for event handler thread to start and begin waiting for events.
disp.waitForEventThreadToWait();
try {
disp.getEventHandler().handle(event);
} catch (YarnRuntimeException e) {
}
// Queue should be empty and dispatcher should not hang on close
Assert.assertTrue("Event Queue should have been empty",
eventQueue.isEmpty());
disp.close();
}
}