YARN-10642. Race condition: AsyncDispatcher can get stuck by the changes introduced in YARN-8995. Contributed by zhengchenyu.

This commit is contained in:
Peter Bacsko 2021-03-08 14:22:14 +01:00
parent 03ac2e41c0
commit 0d032d7b68
2 changed files with 69 additions and 10 deletions

View File

@ -20,11 +20,11 @@ package org.apache.hadoop.yarn.event;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -265,11 +265,16 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
} }
class GenericEventHandler implements EventHandler<Event> { class GenericEventHandler implements EventHandler<Event> {
private void printEventQueueDetails(BlockingQueue<Event> queue) { private void printEventQueueDetails() {
Map<Enum, Long> counterMap = eventQueue.stream(). Iterator<Event> iterator = eventQueue.iterator();
collect(Collectors. Map<Enum, Long> counterMap = new HashMap<>();
groupingBy(e -> e.getType(), Collectors.counting()) while (iterator.hasNext()) {
); Enum eventType = iterator.next().getType();
if (!counterMap.containsKey(eventType)) {
counterMap.put(eventType, 0L);
}
counterMap.put(eventType, counterMap.get(eventType) + 1);
}
for (Map.Entry<Enum, Long> entry : counterMap.entrySet()) { for (Map.Entry<Enum, Long> entry : counterMap.entrySet()) {
long num = entry.getValue(); long num = entry.getValue();
LOG.info("Event type: " + entry.getKey() LOG.info("Event type: " + entry.getKey()
@ -292,7 +297,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
if (qSize != 0 && qSize % detailsInterval == 0 if (qSize != 0 && qSize % detailsInterval == 0
&& lastEventDetailsQueueSizeLogged != qSize) { && lastEventDetailsQueueSizeLogged != qSize) {
lastEventDetailsQueueSizeLogged = qSize; lastEventDetailsQueueSizeLogged = qSize;
printEventQueueDetails(eventQueue); printEventQueueDetails();
printTrigger = true; printTrigger = true;
} }
int remCapacity = eventQueue.remainingCapacity(); int remCapacity = eventQueue.remainingCapacity();

View File

@ -97,12 +97,23 @@ public class TestAsyncDispatcher {
} }
private static class TestHandler implements EventHandler<Event> { private static class TestHandler implements EventHandler<Event> {
private long sleepTime = 1500;
TestHandler() {
}
TestHandler(long sleepTime) {
this.sleepTime = sleepTime;
}
@Override @Override
public void handle(Event event) { public void handle(Event event) {
try { try {
// As long as 10000 events queued // As long as 10000 events queued
Thread.sleep(1500); Thread.sleep(this.sleepTime);
} catch (InterruptedException e) {} } catch (InterruptedException e) {
}
} }
} }
@ -170,11 +181,54 @@ public class TestAsyncDispatcher {
//Make sure more than one event to take //Make sure more than one event to take
verify(log, atLeastOnce()). verify(log, atLeastOnce()).
info("Latest dispatch event type: TestEventType"); info("Latest dispatch event type: TestEventType");
dispatcher.stop();
} finally { } finally {
//... restore logger object //... restore logger object
logger.set(null, oldLog); logger.set(null, oldLog);
dispatcher.stop();
} }
} }
//Test print dispatcher details when the blocking queue is heavy
@Test(timeout = 60000)
public void testPrintDispatcherEventDetailsAvoidDeadLoop() throws Exception {
for (int i = 0; i < 5; i++) {
testPrintDispatcherEventDetailsAvoidDeadLoopInternal();
}
}
public void testPrintDispatcherEventDetailsAvoidDeadLoopInternal()
throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.
YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD, 10);
Log log = mock(Log.class);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
Field logger = AsyncDispatcher.class.getDeclaredField("LOG");
logger.setAccessible(true);
Field modifiers = Field.class.getDeclaredField("modifiers");
modifiers.setAccessible(true);
modifiers.setInt(logger, logger.getModifiers() & ~Modifier.FINAL);
Object oldLog = logger.get(null);
try {
logger.set(null, log);
dispatcher.register(TestEnum.class, new TestHandler(0));
dispatcher.start();
for (int i = 0; i < 10000; ++i) {
Event event = mock(Event.class);
when(event.getType()).thenReturn(TestEnum.TestEventType);
dispatcher.getEventHandler().handle(event);
}
Thread.sleep(3000);
} finally {
//... restore logger object
logger.set(null, oldLog);
dispatcher.stop();
}
}
} }