Fixes #10786 - TLS handshake failures leak HttpConnection.RequestTimeouts tasks
Moved the call to destroy the CyclicTimeouts to a close() call that is always called. Fixed NPE in ManagedSelector.getTotalKeys(). Fixed exception handling to avoid infinite recursion in SslConnection. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
e98c0ae05d
commit
fa166d09c5
|
@ -255,10 +255,7 @@ public class HttpClient extends ContainerLifeCycle
|
|||
decoderFactories.clear();
|
||||
handlers.clear();
|
||||
|
||||
for (HttpDestination destination : destinations.values())
|
||||
{
|
||||
destination.close();
|
||||
}
|
||||
destinations.values().forEach(HttpDestination::close);
|
||||
destinations.clear();
|
||||
|
||||
requestListeners.clear();
|
||||
|
|
|
@ -308,7 +308,7 @@ public abstract class HttpConnection implements IConnection, Attachable
|
|||
return attachment;
|
||||
}
|
||||
|
||||
protected void destroy()
|
||||
public void destroy()
|
||||
{
|
||||
requestTimeouts.destroy();
|
||||
}
|
||||
|
|
|
@ -242,6 +242,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
|
|||
getEndPoint().close();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Closed {}", this);
|
||||
delegate.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -339,7 +340,6 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
|
|||
public void close()
|
||||
{
|
||||
HttpConnectionOverHTTP.this.close();
|
||||
destroy();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -261,6 +261,8 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
|
|||
getEndPoint().close();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Closed {}", this);
|
||||
|
||||
delegate.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -387,7 +389,6 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
|
|||
public void close()
|
||||
{
|
||||
HttpConnectionOverFCGI.this.close();
|
||||
destroy();
|
||||
}
|
||||
|
||||
protected void close(Throwable failure)
|
||||
|
|
|
@ -202,7 +202,6 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
|
|||
public void close()
|
||||
{
|
||||
close(new AsynchronousCloseException());
|
||||
destroy();
|
||||
}
|
||||
|
||||
protected void close(Throwable failure)
|
||||
|
@ -221,6 +220,8 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
|
|||
channel.destroy();
|
||||
channel = idleChannels.poll();
|
||||
}
|
||||
|
||||
destroy();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -148,7 +148,8 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
@ManagedAttribute(value = "Total number of keys", readonly = true)
|
||||
public int getTotalKeys()
|
||||
{
|
||||
return _selector.keys().size();
|
||||
Selector selector = _selector;
|
||||
return selector == null ? 0 : selector.keys().size();
|
||||
}
|
||||
|
||||
@ManagedAttribute(value = "Average number of selected keys", readonly = true)
|
||||
|
@ -555,15 +556,16 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("updateable {}", _updateable.size());
|
||||
|
||||
Selector selector = _selector;
|
||||
for (SelectorUpdate update : _updateable)
|
||||
{
|
||||
if (_selector == null)
|
||||
if (selector == null)
|
||||
break;
|
||||
try
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("update {}", update);
|
||||
update.update(_selector);
|
||||
update.update(selector);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
|
@ -572,13 +574,13 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
|
|||
}
|
||||
_updateable.clear();
|
||||
|
||||
Selector selector;
|
||||
int updates;
|
||||
try (AutoLock l = _lock.lock())
|
||||
{
|
||||
updates = _updates.size();
|
||||
_selecting = updates == 0;
|
||||
selector = _selecting ? null : _selector;
|
||||
if (_selecting)
|
||||
selector = null;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
|
|
|
@ -1546,7 +1546,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} stored {} exception", this, context, x);
|
||||
}
|
||||
else if (x != _failure)
|
||||
else if (x != _failure && x.getCause() != _failure)
|
||||
{
|
||||
_failure.addSuppressed(x);
|
||||
if (LOG.isDebugEnabled())
|
||||
|
|
|
@ -0,0 +1,210 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.http.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import javax.net.ssl.SSLEngine;
|
||||
import javax.net.ssl.SSLEngineResult;
|
||||
import javax.net.ssl.SSLException;
|
||||
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.ClientConnectionFactory;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
|
||||
import org.eclipse.jetty.io.ssl.SslConnection;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ArgumentsSource;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assumptions.assumeTrue;
|
||||
|
||||
public class TLSHandshakeFailureTest extends AbstractTest<TransportScenario>
|
||||
{
|
||||
@Override
|
||||
public void init(Transport transport) throws IOException
|
||||
{
|
||||
setScenario(new TransportScenario(transport));
|
||||
assumeTrue(scenario.transport.isTlsBased());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ArgumentsSource(TransportProvider.class)
|
||||
public void testTLSWrapAbruptSSLEngineClose(Transport transport) throws Exception
|
||||
{
|
||||
TLSHandshakeAction action = SSLEngine::closeOutbound;
|
||||
testTLSWrapFailure(transport, action, 1);
|
||||
testTLSWrapFailure(transport, action, 2);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ArgumentsSource(TransportProvider.class)
|
||||
public void testTLSWrapAbruptSSLEngineFailure(Transport transport) throws Exception
|
||||
{
|
||||
TLSHandshakeAction action = sslEngine ->
|
||||
{
|
||||
throw new SSLException("test");
|
||||
};
|
||||
testTLSWrapFailure(transport, action, 1);
|
||||
testTLSWrapFailure(transport, action, 2);
|
||||
}
|
||||
|
||||
private void testTLSWrapFailure(Transport transport, TLSHandshakeAction action, int wrapCount) throws Exception
|
||||
{
|
||||
init(transport);
|
||||
scenario.start(new EmptyServerHandler());
|
||||
scenario.client.stop();
|
||||
scenario.client = new HttpClient(scenario.client.getTransport())
|
||||
{
|
||||
@Override
|
||||
protected ClientConnectionFactory newSslClientConnectionFactory(SslContextFactory.Client sslContextFactory, ClientConnectionFactory connectionFactory)
|
||||
{
|
||||
if (sslContextFactory == null)
|
||||
sslContextFactory = getSslContextFactory();
|
||||
return new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), connectionFactory)
|
||||
{
|
||||
@Override
|
||||
protected SslConnection newSslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine engine)
|
||||
{
|
||||
return new SslConnection(byteBufferPool, executor, endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
|
||||
{
|
||||
private final AtomicInteger wraps = new AtomicInteger();
|
||||
|
||||
@Override
|
||||
protected SSLEngineResult wrap(SSLEngine sslEngine, ByteBuffer[] input, ByteBuffer output) throws SSLException
|
||||
{
|
||||
if (wraps.incrementAndGet() == wrapCount)
|
||||
action.accept(sslEngine);
|
||||
return super.wrap(sslEngine, input, output);
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
ScheduledThreadPoolExecutor schedulerService = new ScheduledThreadPoolExecutor(1);
|
||||
schedulerService.setRemoveOnCancelPolicy(true);
|
||||
scenario.client.setScheduler(new ScheduledExecutorScheduler(schedulerService));
|
||||
scenario.client.start();
|
||||
|
||||
int count = 10;
|
||||
for (int i = 0; i < count; ++i)
|
||||
{
|
||||
assertThrows(ExecutionException.class, () -> scenario.client.newRequest(scenario.newURI())
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send()
|
||||
);
|
||||
}
|
||||
|
||||
// There should be a task scheduled by HttpDestination
|
||||
// to expire HttpExchanges while they are queued.
|
||||
// There may be also a task for the idle timeout of the connection.
|
||||
List<Runnable> tasks = schedulerService.shutdownNow();
|
||||
assertThat(tasks.toString(), tasks.size(), allOf(greaterThan(0), lessThan(count + 1)));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ArgumentsSource(TransportProvider.class)
|
||||
public void testTLSUnwrapAbruptSSLEngineClose(Transport transport) throws Exception
|
||||
{
|
||||
TLSHandshakeAction action = SSLEngine::closeInbound;
|
||||
testTLSUnwrapFailure(transport, action, 1);
|
||||
testTLSUnwrapFailure(transport, action, 2);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ArgumentsSource(TransportProvider.class)
|
||||
public void testTLSUnwrapAbruptSSLEngineFailure(Transport transport) throws Exception
|
||||
{
|
||||
TLSHandshakeAction action = sslEngine ->
|
||||
{
|
||||
throw new SSLException("test");
|
||||
};
|
||||
testTLSUnwrapFailure(transport, action, 1);
|
||||
testTLSUnwrapFailure(transport, action, 2);
|
||||
}
|
||||
|
||||
private void testTLSUnwrapFailure(Transport transport, TLSHandshakeAction action, int unwrapCount) throws Exception
|
||||
{
|
||||
init(transport);
|
||||
scenario.start(new EmptyServerHandler());
|
||||
scenario.client.stop();
|
||||
scenario.client = new HttpClient(scenario.client.getTransport())
|
||||
{
|
||||
@Override
|
||||
protected ClientConnectionFactory newSslClientConnectionFactory(SslContextFactory.Client sslContextFactory, ClientConnectionFactory connectionFactory)
|
||||
{
|
||||
if (sslContextFactory == null)
|
||||
sslContextFactory = getSslContextFactory();
|
||||
return new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), connectionFactory)
|
||||
{
|
||||
@Override
|
||||
protected SslConnection newSslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine engine)
|
||||
{
|
||||
return new SslConnection(byteBufferPool, executor, endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
|
||||
{
|
||||
private final AtomicInteger unwraps = new AtomicInteger();
|
||||
|
||||
@Override
|
||||
protected SSLEngineResult unwrap(SSLEngine sslEngine, ByteBuffer input, ByteBuffer output) throws SSLException
|
||||
{
|
||||
if (unwraps.incrementAndGet() == unwrapCount)
|
||||
action.accept(sslEngine);
|
||||
return super.unwrap(sslEngine, input, output);
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
ScheduledThreadPoolExecutor schedulerService = new ScheduledThreadPoolExecutor(1);
|
||||
schedulerService.setRemoveOnCancelPolicy(true);
|
||||
scenario.client.setScheduler(new ScheduledExecutorScheduler(schedulerService));
|
||||
scenario.client.start();
|
||||
|
||||
int count = 10;
|
||||
for (int i = 0; i < count; ++i)
|
||||
{
|
||||
assertThrows(ExecutionException.class, () -> scenario.client.newRequest(scenario.newURI())
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send()
|
||||
);
|
||||
}
|
||||
|
||||
// There should be a task scheduled by HttpDestination
|
||||
// to expire HttpExchanges while they are queued.
|
||||
// There may be also a task for the idle timeout of the connection.
|
||||
List<Runnable> tasks = schedulerService.shutdownNow();
|
||||
assertThat(tasks.toString(), tasks.size(), allOf(greaterThan(0), lessThan(count + 1)));
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
private interface TLSHandshakeAction
|
||||
{
|
||||
void accept(SSLEngine sslEngine) throws SSLException;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue