jetty-9 - Second take at open/close refactoring.

This commit is contained in:
Simone Bordet 2012-09-19 13:27:10 +02:00
parent f257f4b2bb
commit 44b3bb067c
7 changed files with 226 additions and 36 deletions

View File

@ -280,7 +280,6 @@ public class HttpConnection extends AbstractConnection implements Connection
LOG.debug("{} oshut", this);
getEndPoint().close();
LOG.debug("{} closed", this);
client.getSelectorManager().connectionClosed(this);
}
@Override

View File

@ -146,6 +146,7 @@ public class SslConnection extends AbstractConnection
// Begin the handshake
_sslEngine.beginHandshake();
super.onOpen();
getDecryptedEndPoint().getConnection().onOpen();
}
catch (SSLException x)
{
@ -440,7 +441,6 @@ public class SslConnection extends AbstractConnection
if (a.getInputBufferSize()<_sslEngine.getSession().getApplicationBufferSize())
a.setInputBufferSize(_sslEngine.getSession().getApplicationBufferSize());
}
super.setConnection(connection);
}

View File

@ -18,14 +18,17 @@
package org.eclipse.jetty.server;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ArrayUtil;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
public abstract class AbstractConnectionFactory extends AggregateLifeCycle implements ConnectionFactory
{
final String _protocol;
int _inputbufferSize=8192;
private final String _protocol;
private int _inputbufferSize = 8192;
protected AbstractConnectionFactory(String protocol)
{
@ -48,6 +51,19 @@ public abstract class AbstractConnectionFactory extends AggregateLifeCycle imple
_inputbufferSize=size;
}
protected void configureConnection(Connection connection, Connector connector, EndPoint endPoint)
{
if (connection instanceof AbstractConnection)
((AbstractConnection)connection).setInputBufferSize(getInputBufferSize());
if (connector instanceof AggregateLifeCycle)
{
AggregateLifeCycle aggregate = (AggregateLifeCycle)connector;
for (Connection.Listener listener : aggregate.getBeans(Connection.Listener.class))
connection.addListener(listener);
}
}
@Override
public String toString()
{
@ -73,6 +89,4 @@ public abstract class AbstractConnectionFactory extends AggregateLifeCycle imple
return ArrayUtil.prependToArray(new SslConnectionFactory(sslContextFactory,factories[0].getProtocol()),factories,ConnectionFactory.class);
}
}

View File

@ -52,7 +52,7 @@ public class HttpConnectionFactory extends AbstractConnectionFactory implements
public Connection newConnection(Connector connector, EndPoint endPoint)
{
HttpConnection connection = new HttpConnection(_config, connector, endPoint);
connection.setInputBufferSize(getInputBufferSize()); // TODO constructor injection
configureConnection(connection, connector, endPoint);
return connection;
}

View File

@ -357,19 +357,19 @@ public class ServerConnector extends AbstractNetworkConnector
getExecutor().execute(task);
}
@Override
public void connectionOpened(Connection connection)
{
// TODO
// TODO
// @Override
// public void connectionOpened(Connection connection)
// {
// ServerConnector.this.connectionOpened(connection);
}
// }
@Override
public void connectionClosed(Connection connection)
{
// TODO
// TODO
// @Override
// public void connectionClosed(Connection connection)
// {
// ServerConnector.this.connectionClosed(connection);
}
// }
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
@ -383,5 +383,4 @@ public class ServerConnector extends AbstractNetworkConnector
return getDefaultConnectionFactory().newConnection(ServerConnector.this, endpoint);
}
}
}

View File

@ -37,7 +37,7 @@ public class SslConnectionFactory extends AbstractConnectionFactory
public SslConnectionFactory()
{
this(null,HttpVersion.HTTP_1_1.asString());
this(HttpVersion.HTTP_1_1.asString());
}
public SslConnectionFactory(@Name("next") String nextProtocol)
@ -58,8 +58,6 @@ public class SslConnectionFactory extends AbstractConnectionFactory
return _sslContextFactory;
}
@Override
protected void doStart() throws Exception
{
@ -80,15 +78,13 @@ public class SslConnectionFactory extends AbstractConnectionFactory
engine.setUseClientMode(false);
SslConnection sslConnection = new SslConnection(connector.getByteBufferPool(), connector.getExecutor(), endPoint, engine);
sslConnection.setInputBufferSize(getInputBufferSize());
EndPoint decrypted_endp = sslConnection.getDecryptedEndPoint();
configureConnection(sslConnection, connector, endPoint);
ConnectionFactory next = connector.getConnectionFactory(_nextProtocol);
Connection connection = next.newConnection(connector, decrypted_endp);
decrypted_endp.setConnection(connection);
EndPoint decryptedEndPoint = sslConnection.getDecryptedEndPoint();
Connection connection = next.newConnection(connector, decryptedEndPoint);
decryptedEndPoint.setConnection(connection);
// TODO
// ((AbstractConnector)connector).connectionOpened(connection);
return sslConnection;
}

View File

@ -0,0 +1,182 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.toolchain.test.http.SimpleHttpResponse;
import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
import org.junit.Test;
public class ConnectionOpenCloseTest extends AbstractHttpTest
{
public ConnectionOpenCloseTest()
{
super(HttpVersion.HTTP_1_1.asString());
}
@Slow
@Test
public void testOpenClose() throws Exception
{
server.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
}
});
server.start();
final AtomicInteger callbacks = new AtomicInteger();
final CountDownLatch openLatch = new CountDownLatch(1);
final CountDownLatch closeLatch = new CountDownLatch(1);
connector.addBean(new Connection.Listener()
{
@Override
public void onOpened(Connection connection)
{
callbacks.incrementAndGet();
openLatch.countDown();
}
@Override
public void onClosed(Connection connection)
{
callbacks.incrementAndGet();
closeLatch.countDown();
}
});
Socket socket = new Socket("localhost", connector.getLocalPort());
socket.setSoTimeout((int)connector.getIdleTimeout());
OutputStream output = socket.getOutputStream();
output.write(("" +
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + connector.getLocalPort() + "\r\n" +
"Connection: close\r\n" +
"\r\n").getBytes("UTF-8"));
output.flush();
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
SimpleHttpResponse response = httpParser.readResponse(reader);
Assert.assertEquals("200", response.getCode());
Assert.assertEquals(-1, reader.read());
socket.close();
Assert.assertTrue(openLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
// Wait some time to see if the callbacks are called too many times
TimeUnit.SECONDS.sleep(1);
Assert.assertEquals(2, callbacks.get());
}
@Slow
@Test
public void testSSLOpenClose() throws Exception
{
SslContextFactory sslContextFactory = new SslContextFactory();
File keystore = MavenTestingUtils.getTestResourceFile("keystore");
sslContextFactory.setKeyStoreResource(Resource.newResource(keystore));
sslContextFactory.setKeyStorePassword("storepwd");
sslContextFactory.setKeyManagerPassword("keypwd");
server.addBean(sslContextFactory);
server.removeConnector(connector);
connector = new ServerConnector(server, sslContextFactory);
server.addConnector(connector);
server.setHandler(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
}
});
server.start();
final AtomicInteger callbacks = new AtomicInteger();
final CountDownLatch openLatch = new CountDownLatch(1);
final CountDownLatch closeLatch = new CountDownLatch(1);
connector.addBean(new Connection.Listener()
{
@Override
public void onOpened(Connection connection)
{
callbacks.incrementAndGet();
openLatch.countDown();
}
@Override
public void onClosed(Connection connection)
{
callbacks.incrementAndGet();
closeLatch.countDown();
}
});
Socket socket = sslContextFactory.getSslContext().getSocketFactory().createSocket("localhost", connector.getLocalPort());
socket.setSoTimeout((int)connector.getIdleTimeout());
OutputStream output = socket.getOutputStream();
output.write(("" +
"GET / HTTP/1.1\r\n" +
"Host: localhost:" + connector.getLocalPort() + "\r\n" +
"Connection: close\r\n" +
"\r\n").getBytes("UTF-8"));
output.flush();
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
SimpleHttpResponse response = httpParser.readResponse(reader);
Assert.assertEquals("200", response.getCode());
Assert.assertEquals(-1, reader.read());
socket.close();
Assert.assertTrue(openLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
// Wait some time to see if the callbacks are called too many times
TimeUnit.SECONDS.sleep(1);
Assert.assertEquals(4, callbacks.get());
}
}