diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java b/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java index 7eebfe24665..4a6fed2bb34 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java @@ -149,6 +149,15 @@ public class EventHandler { context.handleException(exception); } + /** + * This method is called when a task or listener attached to a channel is available to run. + * + * @param task to handle + */ + protected void handleTask(Runnable task) { + task.run(); + } + /** * This method is called when a task or listener attached to a channel operation throws an exception. * @@ -165,7 +174,11 @@ public class EventHandler { */ protected void postHandling(SocketChannelContext context) { if (context.selectorShouldClose()) { - handleClose(context); + try { + handleClose(context); + } catch (IOException e) { + closeException(context, e); + } } else { SelectionKey selectionKey = context.getSelectionKey(); boolean currentlyWriteInterested = SelectionKeyUtils.isWriteInterested(selectionKey); @@ -203,23 +216,19 @@ public class EventHandler { * * @param context that should be closed */ - protected void handleClose(ChannelContext context) { - try { - context.closeFromSelector(); - } catch (IOException e) { - closeException(context, e); - } + protected void handleClose(ChannelContext context) throws IOException { + context.closeFromSelector(); assert context.isOpen() == false : "Should always be done as we are on the selector thread"; } /** * This method is called when an attempt to close a channel throws an exception. * - * @param channel that was being closed + * @param context that was being closed * @param exception that occurred */ - protected void closeException(ChannelContext channel, Exception exception) { - channel.handleException(exception); + protected void closeException(ChannelContext context, Exception exception) { + context.handleException(exception); } /** @@ -227,10 +236,10 @@ public class EventHandler { * An example would be if checking ready ops on a {@link java.nio.channels.SelectionKey} threw * {@link java.nio.channels.CancelledKeyException}. * - * @param channel that caused the exception + * @param context that caused the exception * @param exception that was thrown */ - protected void genericChannelException(ChannelContext channel, Exception exception) { - channel.handleException(exception); + protected void genericChannelException(ChannelContext context, Exception exception) { + context.handleException(exception); } } diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java b/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java index c89703c78c8..1d0af24ae2c 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java @@ -265,11 +265,15 @@ public class NioSelector implements Closeable { private void handleScheduledTasks(long nanoTime) { Runnable task; while ((task = taskScheduler.pollTask(nanoTime)) != null) { - try { - task.run(); - } catch (Exception e) { - eventHandler.taskException(e); - } + handleTask(task); + } + } + + private void handleTask(Runnable task) { + try { + eventHandler.handleTask(task); + } catch (Exception e) { + eventHandler.taskException(e); } } @@ -353,11 +357,7 @@ public class NioSelector implements Closeable { */ public void executeListener(BiConsumer listener, V value) { assertOnSelectorThread(); - try { - listener.accept(value, null); - } catch (Exception e) { - eventHandler.taskException(e); - } + handleTask(() -> listener.accept(value, null)); } /** @@ -369,11 +369,7 @@ public class NioSelector implements Closeable { */ public void executeFailedListener(BiConsumer listener, Exception exception) { assertOnSelectorThread(); - try { - listener.accept(null, exception); - } catch (Exception e) { - eventHandler.taskException(e); - } + handleTask(() -> listener.accept(null, exception)); } private void cleanupPendingWrites() { @@ -437,7 +433,11 @@ public class NioSelector implements Closeable { private void closePendingChannels() { ChannelContext channelContext; while ((channelContext = channelsToClose.poll()) != null) { - eventHandler.handleClose(channelContext); + try { + eventHandler.handleClose(channelContext); + } catch (Exception e) { + eventHandler.closeException(channelContext, e); + } } } diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/EventHandlerTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/EventHandlerTests.java index f3ffab1baef..1b42e8be60d 100644 --- a/libs/nio/src/test/java/org/elasticsearch/nio/EventHandlerTests.java +++ b/libs/nio/src/test/java/org/elasticsearch/nio/EventHandlerTests.java @@ -30,6 +30,7 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.Collections; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import static org.mockito.Matchers.same; @@ -243,10 +244,16 @@ public class EventHandlerTests extends ESTestCase { assertEquals(SelectionKey.OP_READ, key.interestOps()); } - public void testListenerExceptionCallsGenericExceptionHandler() throws IOException { - RuntimeException listenerException = new RuntimeException(); - handler.taskException(listenerException); - verify(genericExceptionHandler).accept(listenerException); + public void testHandleTaskWillRunTask() throws Exception { + AtomicBoolean isRun = new AtomicBoolean(false); + handler.handleTask(() -> isRun.set(true)); + assertTrue(isRun.get()); + } + + public void testTaskExceptionWillCallExceptionHandler() throws Exception { + RuntimeException exception = new RuntimeException(); + handler.taskException(exception); + verify(genericExceptionHandler).accept(exception); } private class DoNotRegisterSocketContext extends BytesChannelContext { diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java index 8cde769cca3..7a641315fe2 100644 --- a/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java +++ b/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java @@ -41,6 +41,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.isNull; import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -86,6 +87,10 @@ public class NioSelectorTests extends ESTestCase { when(serverChannelContext.isOpen()).thenReturn(true); when(serverChannelContext.getSelector()).thenReturn(selector); when(serverChannelContext.getSelectionKey()).thenReturn(selectionKey); + doAnswer(invocationOnMock -> { + ((Runnable) invocationOnMock.getArguments()[0]).run(); + return null; + }).when(eventHandler).handleTask(any()); } @SuppressWarnings({"unchecked", "rawtypes"}) @@ -102,6 +107,23 @@ public class NioSelectorTests extends ESTestCase { verify(eventHandler).handleClose(context); } + @SuppressWarnings({"unchecked", "rawtypes"}) + public void testCloseException() throws IOException { + IOException ioException = new IOException(); + NioChannel channel = mock(NioChannel.class); + ChannelContext context = mock(ChannelContext.class); + when(channel.getContext()).thenReturn(context); + when(context.getSelector()).thenReturn(selector); + + selector.queueChannelClose(channel); + + doThrow(ioException).when(eventHandler).handleClose(context); + + selector.singleLoop(); + + verify(eventHandler).closeException(context, ioException); + } + public void testNioDelayedTasksAreExecuted() throws IOException { AtomicBoolean isRun = new AtomicBoolean(false); long nanoTime = System.nanoTime() - 1; @@ -113,9 +135,27 @@ public class NioSelectorTests extends ESTestCase { assertTrue(isRun.get()); } + public void testTaskExceptionsAreHandled() { + RuntimeException taskException = new RuntimeException(); + long nanoTime = System.nanoTime() - 1; + Runnable task = () -> { + throw taskException; + }; + selector.getTaskScheduler().scheduleAtRelativeTime(task, nanoTime); + + doAnswer((a) -> { + task.run(); + return null; + }).when(eventHandler).handleTask(same(task)); + + selector.singleLoop(); + verify(eventHandler).taskException(taskException); + } + public void testDefaultSelectorTimeoutIsUsedIfNoTaskSooner() throws IOException { long delay = new TimeValue(15, TimeUnit.MINUTES).nanos(); - selector.getTaskScheduler().scheduleAtRelativeTime(() -> {}, System.nanoTime() + delay); + selector.getTaskScheduler().scheduleAtRelativeTime(() -> { + }, System.nanoTime() + delay); selector.singleLoop(); verify(rawSelector).select(300); @@ -127,7 +167,8 @@ public class NioSelectorTests extends ESTestCase { assertBusy(() -> { ArgumentCaptor captor = ArgumentCaptor.forClass(Long.class); long delay = new TimeValue(50, TimeUnit.MILLISECONDS).nanos(); - selector.getTaskScheduler().scheduleAtRelativeTime(() -> {}, System.nanoTime() + delay); + selector.getTaskScheduler().scheduleAtRelativeTime(() -> { + }, System.nanoTime() + delay); selector.singleLoop(); verify(rawSelector).select(captor.capture()); assertTrue(captor.getValue() > 0); @@ -455,23 +496,4 @@ public class NioSelectorTests extends ESTestCase { verify(eventHandler).handleClose(channelContext); verify(eventHandler).handleClose(unregisteredContext); } - - public void testExecuteListenerWillHandleException() throws Exception { - RuntimeException exception = new RuntimeException(); - doThrow(exception).when(listener).accept(null, null); - - selector.executeListener(listener, null); - - verify(eventHandler).taskException(exception); - } - - public void testExecuteFailedListenerWillHandleException() throws Exception { - IOException ioException = new IOException(); - RuntimeException exception = new RuntimeException(); - doThrow(exception).when(listener).accept(null, ioException); - - selector.executeFailedListener(listener, ioException); - - verify(eventHandler).taskException(exception); - } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index a7dbce45d3c..0282e3dcc6a 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -95,7 +95,7 @@ public class MockNioTransport extends TcpTransport { boolean success = false; try { nioGroup = new NioSelectorGroup(daemonThreadFactory(this.settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX), 2, - (s) -> new TestingSocketEventHandler(this::onNonChannelException, s)); + (s) -> new TestEventHandler(this::onNonChannelException, s, System::nanoTime)); ProfileSettings clientProfileSettings = new ProfileSettings(settings, "default"); clientChannelFactory = new MockTcpChannelFactory(true, clientProfileSettings, "client"); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/TestEventHandler.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/TestEventHandler.java new file mode 100644 index 00000000000..a70ecb0c59e --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/TestEventHandler.java @@ -0,0 +1,216 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.nio; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.nio.ChannelContext; +import org.elasticsearch.nio.EventHandler; +import org.elasticsearch.nio.NioSelector; +import org.elasticsearch.nio.ServerChannelContext; +import org.elasticsearch.nio.SocketChannelContext; + +import java.io.IOException; +import java.util.Collections; +import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.LongSupplier; +import java.util.function.Supplier; + +public class TestEventHandler extends EventHandler { + + private static final Logger logger = LogManager.getLogger(TestEventHandler.class); + + private final Set hasConnectedMap = Collections.newSetFromMap(new WeakHashMap<>()); + private final Set hasConnectExceptionMap = Collections.newSetFromMap(new WeakHashMap<>()); + private final LongSupplier relativeNanosSupplier; + + TestEventHandler(Consumer exceptionHandler, Supplier selectorSupplier, LongSupplier relativeNanosSupplier) { + super(exceptionHandler, selectorSupplier); + this.relativeNanosSupplier = relativeNanosSupplier; + } + + @Override + protected void acceptChannel(ServerChannelContext context) throws IOException { + long startTime = relativeNanosSupplier.getAsLong(); + try { + super.acceptChannel(context); + } finally { + maybeLogElapsedTime(startTime); + } + } + + @Override + protected void acceptException(ServerChannelContext context, Exception exception) { + long startTime = relativeNanosSupplier.getAsLong(); + try { + super.acceptException(context, exception); + } finally { + maybeLogElapsedTime(startTime); + } + } + + @Override + protected void handleRegistration(ChannelContext context) throws IOException { + long startTime = relativeNanosSupplier.getAsLong(); + try { + super.handleRegistration(context); + } finally { + maybeLogElapsedTime(startTime); + } + } + + @Override + protected void registrationException(ChannelContext context, Exception exception) { + long startTime = relativeNanosSupplier.getAsLong(); + try { + super.registrationException(context, exception); + } finally { + maybeLogElapsedTime(startTime); + } + } + + public void handleConnect(SocketChannelContext context) throws IOException { + assert hasConnectedMap.contains(context) == false : "handleConnect should only be called is a channel is not yet connected"; + long startTime = relativeNanosSupplier.getAsLong(); + try { + super.handleConnect(context); + if (context.isConnectComplete()) { + hasConnectedMap.add(context); + } + } finally { + maybeLogElapsedTime(startTime); + } + } + + public void connectException(SocketChannelContext context, Exception e) { + assert hasConnectExceptionMap.contains(context) == false : "connectException should only called at maximum once per channel"; + hasConnectExceptionMap.add(context); + long startTime = relativeNanosSupplier.getAsLong(); + try { + super.connectException(context, e); + } finally { + maybeLogElapsedTime(startTime); + } + } + + @Override + protected void handleRead(SocketChannelContext context) throws IOException { + long startTime = relativeNanosSupplier.getAsLong(); + try { + super.handleRead(context); + } finally { + maybeLogElapsedTime(startTime); + } + } + + @Override + protected void readException(SocketChannelContext context, Exception exception) { + long startTime = relativeNanosSupplier.getAsLong(); + try { + super.readException(context, exception); + } finally { + maybeLogElapsedTime(startTime); + } + } + + @Override + protected void handleWrite(SocketChannelContext context) throws IOException { + long startTime = relativeNanosSupplier.getAsLong(); + try { + super.handleWrite(context); + } finally { + maybeLogElapsedTime(startTime); + } + } + + @Override + protected void writeException(SocketChannelContext context, Exception exception) { + long startTime = relativeNanosSupplier.getAsLong(); + try { + super.writeException(context, exception); + } finally { + maybeLogElapsedTime(startTime); + } + } + + @Override + protected void handleTask(Runnable task) { + long startTime = relativeNanosSupplier.getAsLong(); + try { + super.handleTask(task); + } finally { + maybeLogElapsedTime(startTime); + } + } + + @Override + protected void taskException(Exception exception) { + long startTime = relativeNanosSupplier.getAsLong(); + try { + super.taskException(exception); + } finally { + maybeLogElapsedTime(startTime); + } + } + + @Override + protected void handleClose(ChannelContext context) throws IOException { + long startTime = relativeNanosSupplier.getAsLong(); + try { + super.handleClose(context); + } finally { + maybeLogElapsedTime(startTime); + } + } + + @Override + protected void closeException(ChannelContext context, Exception exception) { + long startTime = relativeNanosSupplier.getAsLong(); + try { + super.closeException(context, exception); + } finally { + maybeLogElapsedTime(startTime); + } + } + + @Override + protected void genericChannelException(ChannelContext context, Exception exception) { + long startTime = relativeNanosSupplier.getAsLong(); + try { + super.genericChannelException(context, exception); + } finally { + maybeLogElapsedTime(startTime); + } + } + + private static final long WARN_THRESHOLD = 150; + + private void maybeLogElapsedTime(long startTime) { + long elapsedTime = TimeUnit.NANOSECONDS.toMillis(relativeNanosSupplier.getAsLong() - startTime); + if (elapsedTime > WARN_THRESHOLD) { + logger.warn(new ParameterizedMessage("Slow execution on network thread [{} milliseconds]", elapsedTime), + new RuntimeException("Slow exception on network thread")); + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java deleted file mode 100644 index cecd3c60613..00000000000 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/TestingSocketEventHandler.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.transport.nio; - -import org.elasticsearch.nio.EventHandler; -import org.elasticsearch.nio.NioSelector; -import org.elasticsearch.nio.SocketChannelContext; - -import java.io.IOException; -import java.util.Collections; -import java.util.Set; -import java.util.WeakHashMap; -import java.util.function.Consumer; -import java.util.function.Supplier; - -public class TestingSocketEventHandler extends EventHandler { - - private Set hasConnectedMap = Collections.newSetFromMap(new WeakHashMap<>()); - - public TestingSocketEventHandler(Consumer exceptionHandler, Supplier selectorSupplier) { - super(exceptionHandler, selectorSupplier); - } - - public void handleConnect(SocketChannelContext context) throws IOException { - assert hasConnectedMap.contains(context) == false : "handleConnect should only be called is a channel is not yet connected"; - super.handleConnect(context); - if (context.isConnectComplete()) { - hasConnectedMap.add(context); - } - } - - private Set hasConnectExceptionMap = Collections.newSetFromMap(new WeakHashMap<>()); - - public void connectException(SocketChannelContext context, Exception e) { - assert hasConnectExceptionMap.contains(context) == false : "connectException should only called at maximum once per channel"; - hasConnectExceptionMap.add(context); - super.connectException(context, e); - } -} diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/TestEventHandlerTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/TestEventHandlerTests.java new file mode 100644 index 00000000000..2a570eb59b6 --- /dev/null +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/TestEventHandlerTests.java @@ -0,0 +1,100 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.nio; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.elasticsearch.common.CheckedRunnable; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.nio.ServerChannelContext; +import org.elasticsearch.nio.SocketChannelContext; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLogAppender; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.LongSupplier; + +import static org.mockito.Mockito.mock; + +public class TestEventHandlerTests extends ESTestCase { + + private MockLogAppender appender; + + public void setUp() throws Exception { + super.setUp(); + appender = new MockLogAppender(); + Loggers.addAppender(LogManager.getLogger(TestEventHandler.class), appender); + appender.start(); + } + + public void tearDown() throws Exception { + Loggers.removeAppender(LogManager.getLogger(TestEventHandler.class), appender); + appender.stop(); + super.tearDown(); + } + + public void testLogOnElapsedTime() throws Exception { + long start = System.nanoTime(); + long end = start + TimeUnit.MILLISECONDS.toNanos(200); + AtomicBoolean isStart = new AtomicBoolean(true); + LongSupplier timeSupplier = () -> { + if (isStart.compareAndSet(true, false)) { + return start; + } else if (isStart.compareAndSet(false, true)) { + return end; + } + throw new IllegalStateException("Cannot update isStart"); + }; + TestEventHandler eventHandler = new TestEventHandler((e) -> {}, () -> null, timeSupplier); + + ServerChannelContext serverChannelContext = mock(ServerChannelContext.class); + SocketChannelContext socketChannelContext = mock(SocketChannelContext.class); + RuntimeException exception = new RuntimeException("boom"); + + Map> tests = new HashMap<>(); + + tests.put("acceptChannel", () -> eventHandler.acceptChannel(serverChannelContext)); + tests.put("acceptException", () -> eventHandler.acceptException(serverChannelContext, exception)); + tests.put("registrationException", () -> eventHandler.registrationException(socketChannelContext, exception)); + tests.put("handleConnect", () -> eventHandler.handleConnect(socketChannelContext)); + tests.put("connectException", () -> eventHandler.connectException(socketChannelContext, exception)); + tests.put("handleRead", () -> eventHandler.handleRead(socketChannelContext)); + tests.put("readException", () -> eventHandler.readException(socketChannelContext, exception)); + tests.put("handleWrite", () -> eventHandler.handleWrite(socketChannelContext)); + tests.put("writeException", () -> eventHandler.writeException(socketChannelContext, exception)); + tests.put("handleTask", () -> eventHandler.handleTask(mock(Runnable.class))); + tests.put("taskException", () -> eventHandler.taskException(exception)); + tests.put("handleClose", () -> eventHandler.handleClose(socketChannelContext)); + tests.put("closeException", () -> eventHandler.closeException(socketChannelContext, exception)); + tests.put("genericChannelException", () -> eventHandler.genericChannelException(socketChannelContext, exception)); + + for (Map.Entry> entry : tests.entrySet()) { + String message = "*Slow execution on network thread*"; + MockLogAppender.LoggingExpectation slowExpectation = + new MockLogAppender.SeenEventExpectation(entry.getKey(), TestEventHandler.class.getCanonicalName(), Level.WARN, message); + appender.addExpectation(slowExpectation); + entry.getValue().run(); + appender.assertAllExpectationsMatched(); + } + } +}