Add TLS/SSL channel close timeouts (#37246)
Closing a channel using TLS/SSL requires reading and writing a CLOSE_NOTIFY message (for pre-1.3 TLS versions). Many implementations do not actually send the CLOSE_NOTIFY message, which means we are depending on the TCP close from the other side to ensure channels are closed. In case there is an issue with this, we need a timeout. This commit adds a timeout to the channel close process for TLS secured channels. As part of this change, we need a timer service. We could use the generic Elasticsearch timeout threadpool. However, it would be nice to have a local to the nio event loop timer service dedicated to network needs. In the future this service could support read timeouts, connect timeouts, request timeouts, etc. This commit adds a basic priority queue backed service. Since our timeout volume (channel closes) is very low, this should be fine. However, this can be updated to something more efficient in the future if needed (timer wheel). Everything being local to the event loop thread makes the logic simple as no locking or synchronization is necessary.
This commit is contained in:
parent
13b8bad2b8
commit
cfa58a51af
|
@ -150,11 +150,11 @@ public class EventHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method is called when a listener attached to a channel operation throws an exception.
|
* This method is called when a task or listener attached to a channel operation throws an exception.
|
||||||
*
|
*
|
||||||
* @param exception that occurred
|
* @param exception that occurred
|
||||||
*/
|
*/
|
||||||
protected void listenerException(Exception exception) {
|
protected void taskException(Exception exception) {
|
||||||
exceptionHandler.accept(exception);
|
exceptionHandler.accept(exception);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,7 @@ import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
@ -54,6 +55,7 @@ public class NioSelector implements Closeable {
|
||||||
private final Selector selector;
|
private final Selector selector;
|
||||||
private final ByteBuffer ioBuffer;
|
private final ByteBuffer ioBuffer;
|
||||||
|
|
||||||
|
private final TaskScheduler taskScheduler = new TaskScheduler();
|
||||||
private final ReentrantLock runLock = new ReentrantLock();
|
private final ReentrantLock runLock = new ReentrantLock();
|
||||||
private final CountDownLatch exitedLoop = new CountDownLatch(1);
|
private final CountDownLatch exitedLoop = new CountDownLatch(1);
|
||||||
private final AtomicBoolean isClosed = new AtomicBoolean(false);
|
private final AtomicBoolean isClosed = new AtomicBoolean(false);
|
||||||
|
@ -81,6 +83,10 @@ public class NioSelector implements Closeable {
|
||||||
return ioBuffer;
|
return ioBuffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public TaskScheduler getTaskScheduler() {
|
||||||
|
return taskScheduler;
|
||||||
|
}
|
||||||
|
|
||||||
public Selector rawSelector() {
|
public Selector rawSelector() {
|
||||||
return selector;
|
return selector;
|
||||||
}
|
}
|
||||||
|
@ -145,8 +151,16 @@ public class NioSelector implements Closeable {
|
||||||
try {
|
try {
|
||||||
closePendingChannels();
|
closePendingChannels();
|
||||||
preSelect();
|
preSelect();
|
||||||
|
long nanosUntilNextTask = taskScheduler.nanosUntilNextTask(System.nanoTime());
|
||||||
int ready = selector.select(300);
|
int ready;
|
||||||
|
if (nanosUntilNextTask == 0) {
|
||||||
|
ready = selector.selectNow();
|
||||||
|
} else {
|
||||||
|
long millisUntilNextTask = TimeUnit.NANOSECONDS.toMillis(nanosUntilNextTask);
|
||||||
|
// Only select until the next task needs to be run. Do not select with a value of 0 because
|
||||||
|
// that blocks without a timeout.
|
||||||
|
ready = selector.select(Math.min(300, Math.max(millisUntilNextTask, 1)));
|
||||||
|
}
|
||||||
if (ready > 0) {
|
if (ready > 0) {
|
||||||
Set<SelectionKey> selectionKeys = selector.selectedKeys();
|
Set<SelectionKey> selectionKeys = selector.selectedKeys();
|
||||||
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
|
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
|
||||||
|
@ -164,6 +178,8 @@ public class NioSelector implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
handleScheduledTasks(System.nanoTime());
|
||||||
} catch (ClosedSelectorException e) {
|
} catch (ClosedSelectorException e) {
|
||||||
if (isOpen()) {
|
if (isOpen()) {
|
||||||
throw e;
|
throw e;
|
||||||
|
@ -245,6 +261,17 @@ public class NioSelector implements Closeable {
|
||||||
handleQueuedWrites();
|
handleQueuedWrites();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void handleScheduledTasks(long nanoTime) {
|
||||||
|
Runnable task;
|
||||||
|
while ((task = taskScheduler.pollTask(nanoTime)) != null) {
|
||||||
|
try {
|
||||||
|
task.run();
|
||||||
|
} catch (Exception e) {
|
||||||
|
eventHandler.taskException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Queues a write operation to be handled by the event loop. This can be called by any thread and is the
|
* Queues a write operation to be handled by the event loop. This can be called by any thread and is the
|
||||||
* api available for non-selector threads to schedule writes.
|
* api available for non-selector threads to schedule writes.
|
||||||
|
@ -267,8 +294,10 @@ public class NioSelector implements Closeable {
|
||||||
ChannelContext<?> context = channel.getContext();
|
ChannelContext<?> context = channel.getContext();
|
||||||
assert context.getSelector() == this : "Must schedule a channel for closure with its selector";
|
assert context.getSelector() == this : "Must schedule a channel for closure with its selector";
|
||||||
channelsToClose.offer(context);
|
channelsToClose.offer(context);
|
||||||
ensureSelectorOpenForEnqueuing(channelsToClose, context);
|
if (isOnCurrentThread() == false) {
|
||||||
wakeup();
|
ensureSelectorOpenForEnqueuing(channelsToClose, context);
|
||||||
|
wakeup();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -324,7 +353,7 @@ public class NioSelector implements Closeable {
|
||||||
try {
|
try {
|
||||||
listener.accept(value, null);
|
listener.accept(value, null);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
eventHandler.listenerException(e);
|
eventHandler.taskException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -340,7 +369,7 @@ public class NioSelector implements Closeable {
|
||||||
try {
|
try {
|
||||||
listener.accept(null, exception);
|
listener.accept(null, exception);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
eventHandler.listenerException(e);
|
eventHandler.taskException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -234,6 +234,9 @@ public abstract class SocketChannelContext extends ChannelContext<SocketChannel>
|
||||||
return closeNow;
|
return closeNow;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void setCloseNow() {
|
||||||
|
closeNow = true;
|
||||||
|
}
|
||||||
|
|
||||||
// When you read or write to a nio socket in java, the heap memory passed down must be copied to/from
|
// When you read or write to a nio socket in java, the heap memory passed down must be copied to/from
|
||||||
// direct memory. The JVM internally does some buffering of the direct memory, however we can save space
|
// direct memory. The JVM internally does some buffering of the direct memory, however we can save space
|
||||||
|
|
|
@ -0,0 +1,92 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.nio;
|
||||||
|
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.PriorityQueue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A basic priority queue backed timer service. The service is thread local and should only be used by a
|
||||||
|
* single nio selector event loop thread.
|
||||||
|
*/
|
||||||
|
public class TaskScheduler {
|
||||||
|
|
||||||
|
private final PriorityQueue<DelayedTask> tasks = new PriorityQueue<>(Comparator.comparingLong(DelayedTask::getDeadline));
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Schedule a task at the defined relative nanotime. When {@link #pollTask(long)} is called with a
|
||||||
|
* relative nanotime after the scheduled time, the task will be returned. This method returns a
|
||||||
|
* {@link Runnable} that can be run to cancel the scheduled task.
|
||||||
|
*
|
||||||
|
* @param task to schedule
|
||||||
|
* @param relativeNanos defining when to execute the task
|
||||||
|
* @return runnable that will cancel the task
|
||||||
|
*/
|
||||||
|
public Runnable scheduleAtRelativeTime(Runnable task, long relativeNanos) {
|
||||||
|
DelayedTask delayedTask = new DelayedTask(relativeNanos, task);
|
||||||
|
tasks.offer(delayedTask);
|
||||||
|
return delayedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
Runnable pollTask(long relativeNanos) {
|
||||||
|
DelayedTask task;
|
||||||
|
while ((task = tasks.peek()) != null) {
|
||||||
|
if (relativeNanos - task.deadline >= 0) {
|
||||||
|
tasks.remove();
|
||||||
|
if (task.cancelled == false) {
|
||||||
|
return task.runnable;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
long nanosUntilNextTask(long relativeNanos) {
|
||||||
|
DelayedTask nextTask = tasks.peek();
|
||||||
|
if (nextTask == null) {
|
||||||
|
return Long.MAX_VALUE;
|
||||||
|
} else {
|
||||||
|
return Math.max(nextTask.deadline - relativeNanos, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class DelayedTask implements Runnable {
|
||||||
|
|
||||||
|
private final long deadline;
|
||||||
|
private final Runnable runnable;
|
||||||
|
private boolean cancelled = false;
|
||||||
|
|
||||||
|
private DelayedTask(long deadline, Runnable runnable) {
|
||||||
|
this.deadline = deadline;
|
||||||
|
this.runnable = runnable;
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getDeadline() {
|
||||||
|
return deadline;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
cancelled = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -245,7 +245,7 @@ public class EventHandlerTests extends ESTestCase {
|
||||||
|
|
||||||
public void testListenerExceptionCallsGenericExceptionHandler() throws IOException {
|
public void testListenerExceptionCallsGenericExceptionHandler() throws IOException {
|
||||||
RuntimeException listenerException = new RuntimeException();
|
RuntimeException listenerException = new RuntimeException();
|
||||||
handler.listenerException(listenerException);
|
handler.taskException(listenerException);
|
||||||
verify(genericExceptionHandler).accept(listenerException);
|
verify(genericExceptionHandler).accept(listenerException);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,8 +19,10 @@
|
||||||
|
|
||||||
package org.elasticsearch.nio;
|
package org.elasticsearch.nio;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
@ -31,6 +33,8 @@ import java.nio.channels.SelectionKey;
|
||||||
import java.nio.channels.Selector;
|
import java.nio.channels.Selector;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
|
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
|
@ -98,6 +102,39 @@ public class NioSelectorTests extends ESTestCase {
|
||||||
verify(eventHandler).handleClose(context);
|
verify(eventHandler).handleClose(context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testNioDelayedTasksAreExecuted() throws IOException {
|
||||||
|
AtomicBoolean isRun = new AtomicBoolean(false);
|
||||||
|
long nanoTime = System.nanoTime() - 1;
|
||||||
|
selector.getTaskScheduler().scheduleAtRelativeTime(() -> isRun.set(true), nanoTime);
|
||||||
|
|
||||||
|
assertFalse(isRun.get());
|
||||||
|
selector.singleLoop();
|
||||||
|
verify(rawSelector).selectNow();
|
||||||
|
assertTrue(isRun.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testDefaultSelectorTimeoutIsUsedIfNoTaskSooner() throws IOException {
|
||||||
|
long delay = new TimeValue(15, TimeUnit.MINUTES).nanos();
|
||||||
|
selector.getTaskScheduler().scheduleAtRelativeTime(() -> {}, System.nanoTime() + delay);
|
||||||
|
|
||||||
|
selector.singleLoop();
|
||||||
|
verify(rawSelector).select(300);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSelectorTimeoutWillBeReducedIfTaskSooner() throws Exception {
|
||||||
|
// As this is a timing based test, we must assertBusy in the very small chance that the loop is
|
||||||
|
// delayed for 50 milliseconds (causing a selectNow())
|
||||||
|
assertBusy(() -> {
|
||||||
|
ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
|
||||||
|
long delay = new TimeValue(50, TimeUnit.MILLISECONDS).nanos();
|
||||||
|
selector.getTaskScheduler().scheduleAtRelativeTime(() -> {}, System.nanoTime() + delay);
|
||||||
|
selector.singleLoop();
|
||||||
|
verify(rawSelector).select(captor.capture());
|
||||||
|
assertTrue(captor.getValue() > 0);
|
||||||
|
assertTrue(captor.getValue() < 300);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
public void testSelectorClosedExceptionIsNotCaughtWhileRunning() throws IOException {
|
public void testSelectorClosedExceptionIsNotCaughtWhileRunning() throws IOException {
|
||||||
boolean closedSelectorExceptionCaught = false;
|
boolean closedSelectorExceptionCaught = false;
|
||||||
when(rawSelector.select(anyInt())).thenThrow(new ClosedSelectorException());
|
when(rawSelector.select(anyInt())).thenThrow(new ClosedSelectorException());
|
||||||
|
@ -425,7 +462,7 @@ public class NioSelectorTests extends ESTestCase {
|
||||||
|
|
||||||
selector.executeListener(listener, null);
|
selector.executeListener(listener, null);
|
||||||
|
|
||||||
verify(eventHandler).listenerException(exception);
|
verify(eventHandler).taskException(exception);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testExecuteFailedListenerWillHandleException() throws Exception {
|
public void testExecuteFailedListenerWillHandleException() throws Exception {
|
||||||
|
@ -435,6 +472,6 @@ public class NioSelectorTests extends ESTestCase {
|
||||||
|
|
||||||
selector.executeFailedListener(listener, ioException);
|
selector.executeFailedListener(listener, ioException);
|
||||||
|
|
||||||
verify(eventHandler).listenerException(exception);
|
verify(eventHandler).taskException(exception);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,104 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.nio;
|
||||||
|
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.locks.LockSupport;
|
||||||
|
|
||||||
|
public class TaskSchedulerTests extends ESTestCase {
|
||||||
|
|
||||||
|
private TaskScheduler scheduler = new TaskScheduler();
|
||||||
|
|
||||||
|
public void testScheduleTask() {
|
||||||
|
AtomicBoolean complete = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
long executeTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(10);
|
||||||
|
scheduler.scheduleAtRelativeTime(() -> complete.set(true), executeTime);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
long nanoTime = System.nanoTime();
|
||||||
|
Runnable runnable = scheduler.pollTask(nanoTime);
|
||||||
|
if (nanoTime - executeTime >= 0) {
|
||||||
|
runnable.run();
|
||||||
|
assertTrue(complete.get());
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
assertNull(runnable);
|
||||||
|
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testPollScheduleTaskAtExactTime() {
|
||||||
|
long executeTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(10);
|
||||||
|
scheduler.scheduleAtRelativeTime(() -> {}, executeTime);
|
||||||
|
|
||||||
|
assertNull(scheduler.pollTask(executeTime - 1));
|
||||||
|
assertNotNull(scheduler.pollTask(executeTime));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTaskOrdering() {
|
||||||
|
AtomicBoolean first = new AtomicBoolean(false);
|
||||||
|
AtomicBoolean second = new AtomicBoolean(false);
|
||||||
|
AtomicBoolean third = new AtomicBoolean(false);
|
||||||
|
long executeTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(10);
|
||||||
|
scheduler.scheduleAtRelativeTime(() -> third.set(true), executeTime + 2);
|
||||||
|
scheduler.scheduleAtRelativeTime(() -> first.set(true), executeTime);
|
||||||
|
scheduler.scheduleAtRelativeTime(() -> second.set(true), executeTime + 1);
|
||||||
|
|
||||||
|
scheduler.pollTask(executeTime + 10).run();
|
||||||
|
assertTrue(first.get());
|
||||||
|
assertFalse(second.get());
|
||||||
|
assertFalse(third.get());
|
||||||
|
scheduler.pollTask(executeTime + 10).run();
|
||||||
|
assertTrue(first.get());
|
||||||
|
assertTrue(second.get());
|
||||||
|
assertFalse(third.get());
|
||||||
|
scheduler.pollTask(executeTime + 10).run();
|
||||||
|
assertTrue(first.get());
|
||||||
|
assertTrue(second.get());
|
||||||
|
assertTrue(third.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTaskCancel() {
|
||||||
|
AtomicBoolean first = new AtomicBoolean(false);
|
||||||
|
AtomicBoolean second = new AtomicBoolean(false);
|
||||||
|
long executeTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(10);
|
||||||
|
Runnable cancellable = scheduler.scheduleAtRelativeTime(() -> first.set(true), executeTime);
|
||||||
|
scheduler.scheduleAtRelativeTime(() -> second.set(true), executeTime + 1);
|
||||||
|
|
||||||
|
cancellable.run();
|
||||||
|
scheduler.pollTask(executeTime + 10).run();
|
||||||
|
assertFalse(first.get());
|
||||||
|
assertTrue(second.get());
|
||||||
|
assertNull(scheduler.pollTask(executeTime + 10));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testNanosUntilNextTask() {
|
||||||
|
long nanoTime = System.nanoTime();
|
||||||
|
long executeTime = nanoTime + TimeUnit.MILLISECONDS.toNanos(10);
|
||||||
|
scheduler.scheduleAtRelativeTime(() -> {}, executeTime);
|
||||||
|
assertEquals(TimeUnit.MILLISECONDS.toNanos(10), scheduler.nanosUntilNextTask(nanoTime));
|
||||||
|
assertEquals(TimeUnit.MILLISECONDS.toNanos(5), scheduler.nanosUntilNextTask(nanoTime + TimeUnit.MILLISECONDS.toNanos(5)));
|
||||||
|
}
|
||||||
|
}
|
|
@ -5,6 +5,7 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.security.transport.nio;
|
package org.elasticsearch.xpack.security.transport.nio;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.core.internal.io.IOUtils;
|
import org.elasticsearch.core.internal.io.IOUtils;
|
||||||
import org.elasticsearch.nio.FlushOperation;
|
import org.elasticsearch.nio.FlushOperation;
|
||||||
import org.elasticsearch.nio.InboundChannelBuffer;
|
import org.elasticsearch.nio.InboundChannelBuffer;
|
||||||
|
@ -16,6 +17,7 @@ import org.elasticsearch.nio.WriteOperation;
|
||||||
|
|
||||||
import javax.net.ssl.SSLEngine;
|
import javax.net.ssl.SSLEngine;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
@ -28,7 +30,11 @@ import java.util.function.Predicate;
|
||||||
*/
|
*/
|
||||||
public final class SSLChannelContext extends SocketChannelContext {
|
public final class SSLChannelContext extends SocketChannelContext {
|
||||||
|
|
||||||
|
private static final long CLOSE_TIMEOUT_NANOS = new TimeValue(10, TimeUnit.SECONDS).nanos();
|
||||||
|
private static final Runnable DEFAULT_TIMEOUT_CANCELLER = () -> {};
|
||||||
|
|
||||||
private final SSLDriver sslDriver;
|
private final SSLDriver sslDriver;
|
||||||
|
private Runnable closeTimeoutCanceller = DEFAULT_TIMEOUT_CANCELLER;
|
||||||
|
|
||||||
SSLChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler, SSLDriver sslDriver,
|
SSLChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler, SSLDriver sslDriver,
|
||||||
ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) {
|
ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) {
|
||||||
|
@ -53,6 +59,8 @@ public final class SSLChannelContext extends SocketChannelContext {
|
||||||
getSelector().assertOnSelectorThread();
|
getSelector().assertOnSelectorThread();
|
||||||
if (writeOperation instanceof CloseNotifyOperation) {
|
if (writeOperation instanceof CloseNotifyOperation) {
|
||||||
sslDriver.initiateClose();
|
sslDriver.initiateClose();
|
||||||
|
long relativeNanos = CLOSE_TIMEOUT_NANOS + System.nanoTime();
|
||||||
|
closeTimeoutCanceller = getSelector().getTaskScheduler().scheduleAtRelativeTime(this::channelCloseTimeout, relativeNanos);
|
||||||
} else {
|
} else {
|
||||||
super.queueWriteOperation(writeOperation);
|
super.queueWriteOperation(writeOperation);
|
||||||
}
|
}
|
||||||
|
@ -161,6 +169,7 @@ public final class SSLChannelContext extends SocketChannelContext {
|
||||||
public void closeFromSelector() throws IOException {
|
public void closeFromSelector() throws IOException {
|
||||||
getSelector().assertOnSelectorThread();
|
getSelector().assertOnSelectorThread();
|
||||||
if (channel.isOpen()) {
|
if (channel.isOpen()) {
|
||||||
|
closeTimeoutCanceller.run();
|
||||||
IOUtils.close(super::closeFromSelector, sslDriver::close);
|
IOUtils.close(super::closeFromSelector, sslDriver::close);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -169,6 +178,12 @@ public final class SSLChannelContext extends SocketChannelContext {
|
||||||
return sslDriver.getSSLEngine();
|
return sslDriver.getSSLEngine();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void channelCloseTimeout() {
|
||||||
|
closeTimeoutCanceller = DEFAULT_TIMEOUT_CANCELLER;
|
||||||
|
setCloseNow();
|
||||||
|
getSelector().queueChannelClose(channel);
|
||||||
|
}
|
||||||
|
|
||||||
private static class CloseNotifyOperation implements WriteOperation {
|
private static class CloseNotifyOperation implements WriteOperation {
|
||||||
|
|
||||||
private static final BiConsumer<Void, Exception> LISTENER = (v, t) -> {};
|
private static final BiConsumer<Void, Exception> LISTENER = (v, t) -> {};
|
||||||
|
|
|
@ -12,6 +12,7 @@ import org.elasticsearch.nio.FlushReadyWrite;
|
||||||
import org.elasticsearch.nio.InboundChannelBuffer;
|
import org.elasticsearch.nio.InboundChannelBuffer;
|
||||||
import org.elasticsearch.nio.NioSelector;
|
import org.elasticsearch.nio.NioSelector;
|
||||||
import org.elasticsearch.nio.NioSocketChannel;
|
import org.elasticsearch.nio.NioSocketChannel;
|
||||||
|
import org.elasticsearch.nio.TaskScheduler;
|
||||||
import org.elasticsearch.nio.WriteOperation;
|
import org.elasticsearch.nio.WriteOperation;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -26,9 +27,11 @@ import java.util.function.BiConsumer;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.anyLong;
|
||||||
import static org.mockito.Matchers.same;
|
import static org.mockito.Matchers.same;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
@ -41,6 +44,7 @@ public class SSLChannelContextTests extends ESTestCase {
|
||||||
private SSLChannelContext context;
|
private SSLChannelContext context;
|
||||||
private InboundChannelBuffer channelBuffer;
|
private InboundChannelBuffer channelBuffer;
|
||||||
private NioSelector selector;
|
private NioSelector selector;
|
||||||
|
private TaskScheduler nioTimer;
|
||||||
private BiConsumer<Void, Exception> listener;
|
private BiConsumer<Void, Exception> listener;
|
||||||
private Consumer exceptionHandler;
|
private Consumer exceptionHandler;
|
||||||
private SSLDriver sslDriver;
|
private SSLDriver sslDriver;
|
||||||
|
@ -56,6 +60,7 @@ public class SSLChannelContextTests extends ESTestCase {
|
||||||
|
|
||||||
messageLength = randomInt(96) + 20;
|
messageLength = randomInt(96) + 20;
|
||||||
selector = mock(NioSelector.class);
|
selector = mock(NioSelector.class);
|
||||||
|
nioTimer = mock(TaskScheduler.class);
|
||||||
listener = mock(BiConsumer.class);
|
listener = mock(BiConsumer.class);
|
||||||
channel = mock(NioSocketChannel.class);
|
channel = mock(NioSocketChannel.class);
|
||||||
rawChannel = mock(SocketChannel.class);
|
rawChannel = mock(SocketChannel.class);
|
||||||
|
@ -66,6 +71,7 @@ public class SSLChannelContextTests extends ESTestCase {
|
||||||
context = new SSLChannelContext(channel, selector, exceptionHandler, sslDriver, readWriteHandler, channelBuffer);
|
context = new SSLChannelContext(channel, selector, exceptionHandler, sslDriver, readWriteHandler, channelBuffer);
|
||||||
|
|
||||||
when(selector.isOnCurrentThread()).thenReturn(true);
|
when(selector.isOnCurrentThread()).thenReturn(true);
|
||||||
|
when(selector.getTaskScheduler()).thenReturn(nioTimer);
|
||||||
when(sslDriver.getNetworkReadBuffer()).thenReturn(readBuffer);
|
when(sslDriver.getNetworkReadBuffer()).thenReturn(readBuffer);
|
||||||
when(sslDriver.getNetworkWriteBuffer()).thenReturn(writeBuffer);
|
when(sslDriver.getNetworkWriteBuffer()).thenReturn(writeBuffer);
|
||||||
ByteBuffer buffer = ByteBuffer.allocate(1 << 14);
|
ByteBuffer buffer = ByteBuffer.allocate(1 << 14);
|
||||||
|
@ -334,6 +340,44 @@ public class SSLChannelContextTests extends ESTestCase {
|
||||||
assertTrue(context.selectorShouldClose());
|
assertTrue(context.selectorShouldClose());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testCloseTimeout() {
|
||||||
|
context.closeChannel();
|
||||||
|
|
||||||
|
ArgumentCaptor<WriteOperation> captor = ArgumentCaptor.forClass(WriteOperation.class);
|
||||||
|
verify(selector).writeToChannel(captor.capture());
|
||||||
|
|
||||||
|
ArgumentCaptor<Runnable> taskCaptor = ArgumentCaptor.forClass(Runnable.class);
|
||||||
|
Runnable cancellable = mock(Runnable.class);
|
||||||
|
when(nioTimer.scheduleAtRelativeTime(taskCaptor.capture(), anyLong())).thenReturn(cancellable);
|
||||||
|
context.queueWriteOperation(captor.getValue());
|
||||||
|
verify(nioTimer).scheduleAtRelativeTime(taskCaptor.capture(), anyLong());
|
||||||
|
assertFalse(context.selectorShouldClose());
|
||||||
|
taskCaptor.getValue().run();
|
||||||
|
assertTrue(context.selectorShouldClose());
|
||||||
|
verify(selector).queueChannelClose(channel);
|
||||||
|
verify(cancellable, never()).run();
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testCloseTimeoutIsCancelledOnClose() throws IOException {
|
||||||
|
try (SocketChannel realChannel = SocketChannel.open()) {
|
||||||
|
when(channel.getRawChannel()).thenReturn(realChannel);
|
||||||
|
TestReadWriteHandler readWriteHandler = new TestReadWriteHandler(readConsumer);
|
||||||
|
context = new SSLChannelContext(channel, selector, exceptionHandler, sslDriver, readWriteHandler, channelBuffer);
|
||||||
|
context.closeChannel();
|
||||||
|
ArgumentCaptor<WriteOperation> captor = ArgumentCaptor.forClass(WriteOperation.class);
|
||||||
|
verify(selector).writeToChannel(captor.capture());
|
||||||
|
ArgumentCaptor<Runnable> taskCaptor = ArgumentCaptor.forClass(Runnable.class);
|
||||||
|
Runnable cancellable = mock(Runnable.class);
|
||||||
|
when(nioTimer.scheduleAtRelativeTime(taskCaptor.capture(), anyLong())).thenReturn(cancellable);
|
||||||
|
context.queueWriteOperation(captor.getValue());
|
||||||
|
|
||||||
|
when(channel.isOpen()).thenReturn(true);
|
||||||
|
context.closeFromSelector();
|
||||||
|
verify(cancellable).run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testInitiateCloseFromDifferentThreadSchedulesCloseNotify() {
|
public void testInitiateCloseFromDifferentThreadSchedulesCloseNotify() {
|
||||||
when(selector.isOnCurrentThread()).thenReturn(false, true);
|
when(selector.isOnCurrentThread()).thenReturn(false, true);
|
||||||
context.closeChannel();
|
context.closeChannel();
|
||||||
|
|
Loading…
Reference in New Issue