398872 - SslConnection should not be notified of idle timeouts. First solution.
This commit is contained in:
parent
b5df3a6b76
commit
b2f3852fb3
|
@ -871,6 +871,16 @@ public class HttpClient extends ContainerLifeCycle
|
|||
return encodingField;
|
||||
}
|
||||
|
||||
protected HttpConnection newHttpConnection(HttpClient httpClient, EndPoint endPoint, HttpDestination destination)
|
||||
{
|
||||
return new HttpConnection(httpClient, endPoint, destination);
|
||||
}
|
||||
|
||||
protected SslConnection newSslConnection(HttpClient httpClient, EndPoint endPoint, SSLEngine engine)
|
||||
{
|
||||
return new SslConnection(httpClient.getByteBufferPool(), httpClient.getExecutor(), endPoint, engine);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dump(Appendable out, String indent) throws IOException
|
||||
{
|
||||
|
@ -916,12 +926,9 @@ public class HttpClient extends ContainerLifeCycle
|
|||
SSLEngine engine = sslContextFactory.newSSLEngine(endPoint.getRemoteAddress());
|
||||
engine.setUseClientMode(true);
|
||||
|
||||
SslConnection sslConnection = new SslConnection(getByteBufferPool(), getExecutor(), endPoint, engine);
|
||||
// TODO: configureConnection => implies we should use SslConnectionFactory to do it
|
||||
|
||||
SslConnection sslConnection = newSslConnection(HttpClient.this, endPoint, engine);
|
||||
EndPoint appEndPoint = sslConnection.getDecryptedEndPoint();
|
||||
HttpConnection connection = new HttpConnection(HttpClient.this, appEndPoint, destination);
|
||||
// TODO: configureConnection, see above
|
||||
HttpConnection connection = newHttpConnection(HttpClient.this, appEndPoint, destination);
|
||||
|
||||
appEndPoint.setConnection(connection);
|
||||
callback.promise.succeeded(connection);
|
||||
|
@ -931,8 +938,7 @@ public class HttpClient extends ContainerLifeCycle
|
|||
}
|
||||
else
|
||||
{
|
||||
HttpConnection connection = new HttpConnection(HttpClient.this, endPoint, destination);
|
||||
// TODO: configureConnection, see above
|
||||
HttpConnection connection = newHttpConnection(HttpClient.this, endPoint, destination);
|
||||
callback.promise.succeeded(connection);
|
||||
return connection;
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ import java.io.IOException;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import javax.net.ssl.SSLEngine;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
@ -34,10 +36,13 @@ import org.eclipse.jetty.client.api.Response;
|
|||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.client.util.BufferingResponseListener;
|
||||
import org.eclipse.jetty.client.util.InputStreamContentProvider;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.io.ssl.SslConnection;
|
||||
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||
import org.eclipse.jetty.toolchain.test.annotation.Slow;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -223,6 +228,46 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIdleTimeout() throws Throwable
|
||||
{
|
||||
long timeout = 1000;
|
||||
start(new TimeoutHandler(2 * timeout));
|
||||
client.stop();
|
||||
final AtomicBoolean sslIdle = new AtomicBoolean();
|
||||
client = new HttpClient(sslContextFactory)
|
||||
{
|
||||
@Override
|
||||
protected SslConnection newSslConnection(HttpClient httpClient, EndPoint endPoint, SSLEngine engine)
|
||||
{
|
||||
return new SslConnection(httpClient.getByteBufferPool(), httpClient.getExecutor(), endPoint, engine)
|
||||
{
|
||||
@Override
|
||||
protected boolean onReadTimeout()
|
||||
{
|
||||
sslIdle.set(true);
|
||||
return super.onReadTimeout();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
client.setIdleTimeout(timeout);
|
||||
client.start();
|
||||
|
||||
try
|
||||
{
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.send();
|
||||
Assert.fail();
|
||||
}
|
||||
catch (Exception x)
|
||||
{
|
||||
Assert.assertFalse(sslIdle.get());
|
||||
Assert.assertThat(x.getCause(), Matchers.instanceOf(TimeoutException.class));
|
||||
}
|
||||
}
|
||||
|
||||
private class TimeoutHandler extends AbstractHandler
|
||||
{
|
||||
private final long timeout;
|
||||
|
|
|
@ -30,12 +30,12 @@ import org.eclipse.jetty.util.thread.Scheduler;
|
|||
|
||||
/* ------------------------------------------------------------ */
|
||||
/** An Abstract implementation of an Idle Timeout.
|
||||
*
|
||||
*
|
||||
* This implementation is optimised that timeout operations are not cancelled on
|
||||
* every operation. Rather timeout are allowed to expire and a check is then made
|
||||
* to see when the last operation took place. If the idle timeout has not expired,
|
||||
* every operation. Rather timeout are allowed to expire and a check is then made
|
||||
* to see when the last operation took place. If the idle timeout has not expired,
|
||||
* the timeout is rescheduled for the earliest possible time a timeout could occur.
|
||||
*
|
||||
*
|
||||
*/
|
||||
public abstract class IdleTimeout
|
||||
{
|
||||
|
@ -44,7 +44,7 @@ public abstract class IdleTimeout
|
|||
private final AtomicReference<Scheduler.Task> _timeout = new AtomicReference<>();
|
||||
private volatile long _idleTimeout;
|
||||
private volatile long _idleTimestamp=System.currentTimeMillis();
|
||||
|
||||
|
||||
private final Runnable _idleTask = new Runnable()
|
||||
{
|
||||
@Override
|
||||
|
@ -63,12 +63,12 @@ public abstract class IdleTimeout
|
|||
{
|
||||
_scheduler=scheduler;
|
||||
}
|
||||
|
||||
|
||||
public long getIdleTimestamp()
|
||||
{
|
||||
return _idleTimestamp;
|
||||
}
|
||||
|
||||
|
||||
public long getIdleTimeout()
|
||||
{
|
||||
return _idleTimeout;
|
||||
|
@ -78,22 +78,22 @@ public abstract class IdleTimeout
|
|||
{
|
||||
long old=_idleTimeout;
|
||||
_idleTimeout = idleTimeout;
|
||||
|
||||
|
||||
// Do we have an old timeout
|
||||
if (old>0)
|
||||
{
|
||||
// if the old was less than or equal to the new timeout, then nothing more to do
|
||||
if (old<=_idleTimeout)
|
||||
if (old<=idleTimeout)
|
||||
return;
|
||||
|
||||
|
||||
// old timeout is too long, so cancel it.
|
||||
Scheduler.Task oldTimeout = _timeout.getAndSet(null);
|
||||
if (oldTimeout != null)
|
||||
oldTimeout.cancel();
|
||||
}
|
||||
|
||||
|
||||
// If we have a new timeout, then check and reschedule
|
||||
if (_idleTimeout>0 && isOpen())
|
||||
if (idleTimeout>0 && isOpen())
|
||||
_idleTask.run();
|
||||
}
|
||||
|
||||
|
@ -103,7 +103,7 @@ public abstract class IdleTimeout
|
|||
{
|
||||
_idleTimestamp=System.currentTimeMillis();
|
||||
}
|
||||
|
||||
|
||||
private void scheduleIdleTimeout(long delay)
|
||||
{
|
||||
Scheduler.Task newTimeout = null;
|
||||
|
@ -113,20 +113,20 @@ public abstract class IdleTimeout
|
|||
if (oldTimeout != null)
|
||||
oldTimeout.cancel();
|
||||
}
|
||||
|
||||
|
||||
public void onOpen()
|
||||
{
|
||||
if (_idleTimeout>0)
|
||||
_idleTask.run();
|
||||
}
|
||||
|
||||
|
||||
protected void close()
|
||||
{
|
||||
Scheduler.Task oldTimeout = _timeout.getAndSet(null);
|
||||
if (oldTimeout != null)
|
||||
oldTimeout.cancel();
|
||||
}
|
||||
|
||||
|
||||
protected long checkIdleTimeout()
|
||||
{
|
||||
if (isOpen())
|
||||
|
@ -158,14 +158,14 @@ public abstract class IdleTimeout
|
|||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/** This abstract method is called when the idle timeout has expired.
|
||||
* @param timeout a TimeoutException
|
||||
*/
|
||||
abstract protected void onIdleExpired(TimeoutException timeout);
|
||||
|
||||
|
||||
|
||||
|
||||
/* ------------------------------------------------------------ */
|
||||
/** This abstract method should be called to check if idle timeouts
|
||||
* should still be checked.
|
||||
|
|
|
@ -100,7 +100,7 @@ public class SslConnection extends AbstractConnection
|
|||
_decryptedEndPoint.getWriteFlusher().completeWrite();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
public SslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine sslEngine)
|
||||
{
|
||||
// This connection does not execute calls to onfillable, so they will be called by the selector thread.
|
||||
|
@ -109,7 +109,7 @@ public class SslConnection extends AbstractConnection
|
|||
this._bufferPool = byteBufferPool;
|
||||
this._sslEngine = sslEngine;
|
||||
this._decryptedEndPoint = newDecryptedEndPoint();
|
||||
|
||||
|
||||
if (endPoint instanceof SocketBased)
|
||||
{
|
||||
try
|
||||
|
@ -220,9 +220,6 @@ public class SslConnection extends AbstractConnection
|
|||
// the decrypted readInterest and/or writeFlusher so that they will attempt
|
||||
// to do the fill and/or flush again and these calls will do the actually
|
||||
// handle the cause.
|
||||
|
||||
super.onFillInterestedFailed(cause);
|
||||
|
||||
synchronized(_decryptedEndPoint)
|
||||
{
|
||||
_decryptedEndPoint.getFillInterest().onFail(cause);
|
||||
|
@ -482,13 +479,13 @@ public class SslConnection extends AbstractConnection
|
|||
LOG.debug("{} filled {} encrypted bytes", SslConnection.this, net_filled);
|
||||
if (net_filled > 0)
|
||||
_underFlown = false;
|
||||
|
||||
|
||||
// Let's try the SSL thang even if we have no net data because in that
|
||||
// case we want to fall through to the handshake handling
|
||||
int pos = BufferUtil.flipToFill(app_in);
|
||||
|
||||
|
||||
SSLEngineResult unwrapResult = _sslEngine.unwrap(_encryptedInput, app_in);
|
||||
|
||||
|
||||
BufferUtil.flipToFlush(app_in, pos);
|
||||
if (DEBUG)
|
||||
LOG.debug("{} unwrap {}", SslConnection.this, unwrapResult);
|
||||
|
@ -663,7 +660,7 @@ public class SslConnection extends AbstractConnection
|
|||
BufferUtil.flipToFlush(_encryptedOutput, pos);
|
||||
if (wrapResult.bytesConsumed()>0)
|
||||
consumed+=wrapResult.bytesConsumed();
|
||||
|
||||
|
||||
boolean all_consumed=true;
|
||||
// clear empty buffers to prevent position creeping up the buffer
|
||||
for (ByteBuffer b : appOuts)
|
||||
|
|
|
@ -20,11 +20,9 @@ package org.eclipse.jetty.io;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.Socket;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.net.ssl.SSLEngine;
|
||||
import javax.net.ssl.SSLEngineResult;
|
||||
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
|
||||
|
@ -42,9 +40,7 @@ import org.junit.Test;
|
|||
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
||||
public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
|
||||
|
@ -226,51 +222,6 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
|
|||
super.testIdle();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testBlockedReadIdle() throws Exception
|
||||
{
|
||||
Socket client = newClient();
|
||||
OutputStream clientOutputStream = client.getOutputStream();
|
||||
|
||||
client.setSoTimeout(5000);
|
||||
|
||||
SocketChannel server = _connector.accept();
|
||||
server.configureBlocking(false);
|
||||
|
||||
_manager.accept(server);
|
||||
|
||||
// Write client to server
|
||||
clientOutputStream.write("HelloWorld".getBytes("UTF-8"));
|
||||
|
||||
// Verify echo server to client
|
||||
for (char c : "HelloWorld".toCharArray())
|
||||
{
|
||||
int b = client.getInputStream().read();
|
||||
assertTrue(b>0);
|
||||
assertEquals(c,(char)b);
|
||||
}
|
||||
|
||||
assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS));
|
||||
_lastEndPoint.setIdleTimeout(500);
|
||||
|
||||
// Write 8 and cause block waiting for 10
|
||||
_blockAt=10;
|
||||
clientOutputStream.write("12345678".getBytes("UTF-8"));
|
||||
clientOutputStream.flush();
|
||||
|
||||
// read until idle shutdown received
|
||||
long start=System.currentTimeMillis();
|
||||
int b=client.getInputStream().read();
|
||||
assertEquals(-1,b);
|
||||
long idle=System.currentTimeMillis()-start;
|
||||
assertTrue(idle>400);
|
||||
assertTrue(idle<2000);
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
assertFalse(_lastEndPoint.isOpen());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Override
|
||||
@Stress("Requires a relatively idle (network wise) environment")
|
||||
|
|
|
@ -18,12 +18,6 @@
|
|||
|
||||
package org.eclipse.jetty.io;
|
||||
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.IOException;
|
||||
|
@ -52,6 +46,12 @@ import org.junit.Assert;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class SelectChannelEndPointTest
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(SelectChannelEndPointTest.class);
|
||||
|
@ -220,7 +220,7 @@ public class SelectChannelEndPointTest
|
|||
{
|
||||
Socket client = newClient();
|
||||
|
||||
client.setSoTimeout(60000);
|
||||
client.setSoTimeout(60000);
|
||||
|
||||
SocketChannel server = _connector.accept();
|
||||
server.configureBlocking(false);
|
||||
|
@ -425,6 +425,7 @@ public class SelectChannelEndPointTest
|
|||
public void testBlockedReadIdle() throws Exception
|
||||
{
|
||||
Socket client = newClient();
|
||||
InputStream clientInputStream = client.getInputStream();
|
||||
OutputStream clientOutputStream = client.getOutputStream();
|
||||
|
||||
client.setSoTimeout(5000);
|
||||
|
@ -440,7 +441,7 @@ public class SelectChannelEndPointTest
|
|||
// Verify echo server to client
|
||||
for (char c : "HelloWorld".toCharArray())
|
||||
{
|
||||
int b = client.getInputStream().read();
|
||||
int b = clientInputStream.read();
|
||||
assertTrue(b > 0);
|
||||
assertEquals(c, (char)b);
|
||||
}
|
||||
|
@ -456,7 +457,7 @@ public class SelectChannelEndPointTest
|
|||
|
||||
// read until idle shutdown received
|
||||
long start = System.currentTimeMillis();
|
||||
int b = client.getInputStream().read();
|
||||
int b = clientInputStream.read();
|
||||
assertEquals('E', b);
|
||||
long idle = System.currentTimeMillis() - start;
|
||||
assertTrue(idle > idleTimeout / 2);
|
||||
|
@ -464,10 +465,12 @@ public class SelectChannelEndPointTest
|
|||
|
||||
for (char c : "E: 12345678".toCharArray())
|
||||
{
|
||||
b = client.getInputStream().read();
|
||||
b = clientInputStream.read();
|
||||
assertTrue(b > 0);
|
||||
assertEquals(c, (char)b);
|
||||
}
|
||||
b = clientInputStream.read();
|
||||
assertEquals(-1,b);
|
||||
|
||||
// But endpoint is still open.
|
||||
if(_lastEndPoint.isOpen())
|
||||
|
|
Loading…
Reference in New Issue