Add log warnings for long running event handling (#39729)

Recently we have had a number of test issues related to blocking
activity occuring on the io thread. This commit adds a log warning for
when handling event takes a >150 milliseconds. This is implemented
for the MockNioTransport which is the transport used in
ESIntegTestCase.
This commit is contained in:
Tim Brooks 2019-03-08 12:50:24 -07:00
parent 73a672b8dd
commit 5612ed97ca
No known key found for this signature in database
GPG Key ID: C2AA3BB91A889E77
8 changed files with 409 additions and 111 deletions

View File

@ -149,6 +149,15 @@ public class EventHandler {
context.handleException(exception);
}
/**
* This method is called when a task or listener attached to a channel is available to run.
*
* @param task to handle
*/
protected void handleTask(Runnable task) {
task.run();
}
/**
* This method is called when a task or listener attached to a channel operation throws an exception.
*
@ -165,7 +174,11 @@ public class EventHandler {
*/
protected void postHandling(SocketChannelContext context) {
if (context.selectorShouldClose()) {
try {
handleClose(context);
} catch (IOException e) {
closeException(context, e);
}
} else {
SelectionKey selectionKey = context.getSelectionKey();
boolean currentlyWriteInterested = SelectionKeyUtils.isWriteInterested(selectionKey);
@ -203,23 +216,19 @@ public class EventHandler {
*
* @param context that should be closed
*/
protected void handleClose(ChannelContext<?> context) {
try {
protected void handleClose(ChannelContext<?> context) throws IOException {
context.closeFromSelector();
} catch (IOException e) {
closeException(context, e);
}
assert context.isOpen() == false : "Should always be done as we are on the selector thread";
}
/**
* This method is called when an attempt to close a channel throws an exception.
*
* @param channel that was being closed
* @param context that was being closed
* @param exception that occurred
*/
protected void closeException(ChannelContext<?> channel, Exception exception) {
channel.handleException(exception);
protected void closeException(ChannelContext<?> context, Exception exception) {
context.handleException(exception);
}
/**
@ -227,10 +236,10 @@ public class EventHandler {
* An example would be if checking ready ops on a {@link java.nio.channels.SelectionKey} threw
* {@link java.nio.channels.CancelledKeyException}.
*
* @param channel that caused the exception
* @param context that caused the exception
* @param exception that was thrown
*/
protected void genericChannelException(ChannelContext<?> channel, Exception exception) {
channel.handleException(exception);
protected void genericChannelException(ChannelContext<?> context, Exception exception) {
context.handleException(exception);
}
}

View File

@ -265,13 +265,17 @@ public class NioSelector implements Closeable {
private void handleScheduledTasks(long nanoTime) {
Runnable task;
while ((task = taskScheduler.pollTask(nanoTime)) != null) {
handleTask(task);
}
}
private void handleTask(Runnable task) {
try {
task.run();
eventHandler.handleTask(task);
} catch (Exception e) {
eventHandler.taskException(e);
}
}
}
/**
* Queues a write operation to be handled by the event loop. This can be called by any thread and is the
@ -353,11 +357,7 @@ public class NioSelector implements Closeable {
*/
public <V> void executeListener(BiConsumer<V, Exception> listener, V value) {
assertOnSelectorThread();
try {
listener.accept(value, null);
} catch (Exception e) {
eventHandler.taskException(e);
}
handleTask(() -> listener.accept(value, null));
}
/**
@ -369,11 +369,7 @@ public class NioSelector implements Closeable {
*/
public <V> void executeFailedListener(BiConsumer<V, Exception> listener, Exception exception) {
assertOnSelectorThread();
try {
listener.accept(null, exception);
} catch (Exception e) {
eventHandler.taskException(e);
}
handleTask(() -> listener.accept(null, exception));
}
private void cleanupPendingWrites() {
@ -437,7 +433,11 @@ public class NioSelector implements Closeable {
private void closePendingChannels() {
ChannelContext<?> channelContext;
while ((channelContext = channelsToClose.poll()) != null) {
try {
eventHandler.handleClose(channelContext);
} catch (Exception e) {
eventHandler.closeException(channelContext, e);
}
}
}

View File

@ -30,6 +30,7 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import static org.mockito.Matchers.same;
@ -243,10 +244,16 @@ public class EventHandlerTests extends ESTestCase {
assertEquals(SelectionKey.OP_READ, key.interestOps());
}
public void testListenerExceptionCallsGenericExceptionHandler() throws IOException {
RuntimeException listenerException = new RuntimeException();
handler.taskException(listenerException);
verify(genericExceptionHandler).accept(listenerException);
public void testHandleTaskWillRunTask() throws Exception {
AtomicBoolean isRun = new AtomicBoolean(false);
handler.handleTask(() -> isRun.set(true));
assertTrue(isRun.get());
}
public void testTaskExceptionWillCallExceptionHandler() throws Exception {
RuntimeException exception = new RuntimeException();
handler.taskException(exception);
verify(genericExceptionHandler).accept(exception);
}
private class DoNotRegisterSocketContext extends BytesChannelContext {

View File

@ -41,6 +41,7 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.isNull;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@ -86,6 +87,10 @@ public class NioSelectorTests extends ESTestCase {
when(serverChannelContext.isOpen()).thenReturn(true);
when(serverChannelContext.getSelector()).thenReturn(selector);
when(serverChannelContext.getSelectionKey()).thenReturn(selectionKey);
doAnswer(invocationOnMock -> {
((Runnable) invocationOnMock.getArguments()[0]).run();
return null;
}).when(eventHandler).handleTask(any());
}
@SuppressWarnings({"unchecked", "rawtypes"})
@ -102,6 +107,23 @@ public class NioSelectorTests extends ESTestCase {
verify(eventHandler).handleClose(context);
}
@SuppressWarnings({"unchecked", "rawtypes"})
public void testCloseException() throws IOException {
IOException ioException = new IOException();
NioChannel channel = mock(NioChannel.class);
ChannelContext context = mock(ChannelContext.class);
when(channel.getContext()).thenReturn(context);
when(context.getSelector()).thenReturn(selector);
selector.queueChannelClose(channel);
doThrow(ioException).when(eventHandler).handleClose(context);
selector.singleLoop();
verify(eventHandler).closeException(context, ioException);
}
public void testNioDelayedTasksAreExecuted() throws IOException {
AtomicBoolean isRun = new AtomicBoolean(false);
long nanoTime = System.nanoTime() - 1;
@ -113,9 +135,27 @@ public class NioSelectorTests extends ESTestCase {
assertTrue(isRun.get());
}
public void testTaskExceptionsAreHandled() {
RuntimeException taskException = new RuntimeException();
long nanoTime = System.nanoTime() - 1;
Runnable task = () -> {
throw taskException;
};
selector.getTaskScheduler().scheduleAtRelativeTime(task, nanoTime);
doAnswer((a) -> {
task.run();
return null;
}).when(eventHandler).handleTask(same(task));
selector.singleLoop();
verify(eventHandler).taskException(taskException);
}
public void testDefaultSelectorTimeoutIsUsedIfNoTaskSooner() throws IOException {
long delay = new TimeValue(15, TimeUnit.MINUTES).nanos();
selector.getTaskScheduler().scheduleAtRelativeTime(() -> {}, System.nanoTime() + delay);
selector.getTaskScheduler().scheduleAtRelativeTime(() -> {
}, System.nanoTime() + delay);
selector.singleLoop();
verify(rawSelector).select(300);
@ -127,7 +167,8 @@ public class NioSelectorTests extends ESTestCase {
assertBusy(() -> {
ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
long delay = new TimeValue(50, TimeUnit.MILLISECONDS).nanos();
selector.getTaskScheduler().scheduleAtRelativeTime(() -> {}, System.nanoTime() + delay);
selector.getTaskScheduler().scheduleAtRelativeTime(() -> {
}, System.nanoTime() + delay);
selector.singleLoop();
verify(rawSelector).select(captor.capture());
assertTrue(captor.getValue() > 0);
@ -455,23 +496,4 @@ public class NioSelectorTests extends ESTestCase {
verify(eventHandler).handleClose(channelContext);
verify(eventHandler).handleClose(unregisteredContext);
}
public void testExecuteListenerWillHandleException() throws Exception {
RuntimeException exception = new RuntimeException();
doThrow(exception).when(listener).accept(null, null);
selector.executeListener(listener, null);
verify(eventHandler).taskException(exception);
}
public void testExecuteFailedListenerWillHandleException() throws Exception {
IOException ioException = new IOException();
RuntimeException exception = new RuntimeException();
doThrow(exception).when(listener).accept(null, ioException);
selector.executeFailedListener(listener, ioException);
verify(eventHandler).taskException(exception);
}
}

View File

@ -95,7 +95,7 @@ public class MockNioTransport extends TcpTransport {
boolean success = false;
try {
nioGroup = new NioSelectorGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2,
(s) -> new TestingSocketEventHandler(this::onNonChannelException, s));
(s) -> new TestEventHandler(this::onNonChannelException, s, System::nanoTime));
ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default");
clientChannelFactory = new MockTcpChannelFactory(true, clientProfileSettings, "client");

View File

@ -0,0 +1,216 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.nio;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.nio.ChannelContext;
import org.elasticsearch.nio.EventHandler;
import org.elasticsearch.nio.NioSelector;
import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.nio.SocketChannelContext;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
public class TestEventHandler extends EventHandler {
private static final Logger logger = LogManager.getLogger(TestEventHandler.class);
private final Set<SocketChannelContext> hasConnectedMap = Collections.newSetFromMap(new WeakHashMap<>());
private final Set<SocketChannelContext> hasConnectExceptionMap = Collections.newSetFromMap(new WeakHashMap<>());
private final LongSupplier relativeNanosSupplier;
TestEventHandler(Consumer<Exception> exceptionHandler, Supplier<NioSelector> selectorSupplier, LongSupplier relativeNanosSupplier) {
super(exceptionHandler, selectorSupplier);
this.relativeNanosSupplier = relativeNanosSupplier;
}
@Override
protected void acceptChannel(ServerChannelContext context) throws IOException {
long startTime = relativeNanosSupplier.getAsLong();
try {
super.acceptChannel(context);
} finally {
maybeLogElapsedTime(startTime);
}
}
@Override
protected void acceptException(ServerChannelContext context, Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
try {
super.acceptException(context, exception);
} finally {
maybeLogElapsedTime(startTime);
}
}
@Override
protected void handleRegistration(ChannelContext<?> context) throws IOException {
long startTime = relativeNanosSupplier.getAsLong();
try {
super.handleRegistration(context);
} finally {
maybeLogElapsedTime(startTime);
}
}
@Override
protected void registrationException(ChannelContext<?> context, Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
try {
super.registrationException(context, exception);
} finally {
maybeLogElapsedTime(startTime);
}
}
public void handleConnect(SocketChannelContext context) throws IOException {
assert hasConnectedMap.contains(context) == false : "handleConnect should only be called is a channel is not yet connected";
long startTime = relativeNanosSupplier.getAsLong();
try {
super.handleConnect(context);
if (context.isConnectComplete()) {
hasConnectedMap.add(context);
}
} finally {
maybeLogElapsedTime(startTime);
}
}
public void connectException(SocketChannelContext context, Exception e) {
assert hasConnectExceptionMap.contains(context) == false : "connectException should only called at maximum once per channel";
hasConnectExceptionMap.add(context);
long startTime = relativeNanosSupplier.getAsLong();
try {
super.connectException(context, e);
} finally {
maybeLogElapsedTime(startTime);
}
}
@Override
protected void handleRead(SocketChannelContext context) throws IOException {
long startTime = relativeNanosSupplier.getAsLong();
try {
super.handleRead(context);
} finally {
maybeLogElapsedTime(startTime);
}
}
@Override
protected void readException(SocketChannelContext context, Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
try {
super.readException(context, exception);
} finally {
maybeLogElapsedTime(startTime);
}
}
@Override
protected void handleWrite(SocketChannelContext context) throws IOException {
long startTime = relativeNanosSupplier.getAsLong();
try {
super.handleWrite(context);
} finally {
maybeLogElapsedTime(startTime);
}
}
@Override
protected void writeException(SocketChannelContext context, Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
try {
super.writeException(context, exception);
} finally {
maybeLogElapsedTime(startTime);
}
}
@Override
protected void handleTask(Runnable task) {
long startTime = relativeNanosSupplier.getAsLong();
try {
super.handleTask(task);
} finally {
maybeLogElapsedTime(startTime);
}
}
@Override
protected void taskException(Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
try {
super.taskException(exception);
} finally {
maybeLogElapsedTime(startTime);
}
}
@Override
protected void handleClose(ChannelContext<?> context) throws IOException {
long startTime = relativeNanosSupplier.getAsLong();
try {
super.handleClose(context);
} finally {
maybeLogElapsedTime(startTime);
}
}
@Override
protected void closeException(ChannelContext<?> context, Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
try {
super.closeException(context, exception);
} finally {
maybeLogElapsedTime(startTime);
}
}
@Override
protected void genericChannelException(ChannelContext<?> context, Exception exception) {
long startTime = relativeNanosSupplier.getAsLong();
try {
super.genericChannelException(context, exception);
} finally {
maybeLogElapsedTime(startTime);
}
}
private static final long WARN_THRESHOLD = 150;
private void maybeLogElapsedTime(long startTime) {
long elapsedTime = TimeUnit.NANOSECONDS.toMillis(relativeNanosSupplier.getAsLong() - startTime);
if (elapsedTime > WARN_THRESHOLD) {
logger.warn(new ParameterizedMessage("Slow execution on network thread [{} milliseconds]", elapsedTime),
new RuntimeException("Slow exception on network thread"));
}
}
}

View File

@ -1,56 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.nio;
import org.elasticsearch.nio.EventHandler;
import org.elasticsearch.nio.NioSelector;
import org.elasticsearch.nio.SocketChannelContext;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class TestingSocketEventHandler extends EventHandler {
private Set<SocketChannelContext> hasConnectedMap = Collections.newSetFromMap(new WeakHashMap<>());
public TestingSocketEventHandler(Consumer<Exception> exceptionHandler, Supplier<NioSelector> selectorSupplier) {
super(exceptionHandler, selectorSupplier);
}
public void handleConnect(SocketChannelContext context) throws IOException {
assert hasConnectedMap.contains(context) == false : "handleConnect should only be called is a channel is not yet connected";
super.handleConnect(context);
if (context.isConnectComplete()) {
hasConnectedMap.add(context);
}
}
private Set<SocketChannelContext> hasConnectExceptionMap = Collections.newSetFromMap(new WeakHashMap<>());
public void connectException(SocketChannelContext context, Exception e) {
assert hasConnectExceptionMap.contains(context) == false : "connectException should only called at maximum once per channel";
hasConnectExceptionMap.add(context);
super.connectException(context, e);
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.nio;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.nio.SocketChannelContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
import static org.mockito.Mockito.mock;
public class TestEventHandlerTests extends ESTestCase {
private MockLogAppender appender;
public void setUp() throws Exception {
super.setUp();
appender = new MockLogAppender();
Loggers.addAppender(LogManager.getLogger(TestEventHandler.class), appender);
appender.start();
}
public void tearDown() throws Exception {
Loggers.removeAppender(LogManager.getLogger(TestEventHandler.class), appender);
appender.stop();
super.tearDown();
}
public void testLogOnElapsedTime() throws Exception {
long start = System.nanoTime();
long end = start + TimeUnit.MILLISECONDS.toNanos(200);
AtomicBoolean isStart = new AtomicBoolean(true);
LongSupplier timeSupplier = () -> {
if (isStart.compareAndSet(true, false)) {
return start;
} else if (isStart.compareAndSet(false, true)) {
return end;
}
throw new IllegalStateException("Cannot update isStart");
};
TestEventHandler eventHandler = new TestEventHandler((e) -> {}, () -> null, timeSupplier);
ServerChannelContext serverChannelContext = mock(ServerChannelContext.class);
SocketChannelContext socketChannelContext = mock(SocketChannelContext.class);
RuntimeException exception = new RuntimeException("boom");
Map<String, CheckedRunnable<Exception>> tests = new HashMap<>();
tests.put("acceptChannel", () -> eventHandler.acceptChannel(serverChannelContext));
tests.put("acceptException", () -> eventHandler.acceptException(serverChannelContext, exception));
tests.put("registrationException", () -> eventHandler.registrationException(socketChannelContext, exception));
tests.put("handleConnect", () -> eventHandler.handleConnect(socketChannelContext));
tests.put("connectException", () -> eventHandler.connectException(socketChannelContext, exception));
tests.put("handleRead", () -> eventHandler.handleRead(socketChannelContext));
tests.put("readException", () -> eventHandler.readException(socketChannelContext, exception));
tests.put("handleWrite", () -> eventHandler.handleWrite(socketChannelContext));
tests.put("writeException", () -> eventHandler.writeException(socketChannelContext, exception));
tests.put("handleTask", () -> eventHandler.handleTask(mock(Runnable.class)));
tests.put("taskException", () -> eventHandler.taskException(exception));
tests.put("handleClose", () -> eventHandler.handleClose(socketChannelContext));
tests.put("closeException", () -> eventHandler.closeException(socketChannelContext, exception));
tests.put("genericChannelException", () -> eventHandler.genericChannelException(socketChannelContext, exception));
for (Map.Entry<String, CheckedRunnable<Exception>> entry : tests.entrySet()) {
String message = "*Slow execution on network thread*";
MockLogAppender.LoggingExpectation slowExpectation =
new MockLogAppender.SeenEventExpectation(entry.getKey(), TestEventHandler.class.getCanonicalName(), Level.WARN, message);
appender.addExpectation(slowExpectation);
entry.getValue().run();
appender.assertAllExpectationsMatched();
}
}
}