diff --git a/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java b/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java index 44bf9f3ea1d..d1ef41c913a 100644 --- a/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java +++ b/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java @@ -26,9 +26,13 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLException; @@ -168,6 +172,7 @@ public class SPDYClient public static class Factory extends AggregateLifeCycle { private final Map factories = new ConcurrentHashMap<>(); + private final Queue sessions = new ConcurrentLinkedQueue<>(); private final ThreadPool threadPool; private final SslContextFactory sslContextFactory; private final SelectorManager selector; @@ -209,6 +214,13 @@ public class SPDYClient return new SPDYClient(version, this); } + @Override + protected void doStop() throws Exception + { + closeConnections(); + super.doStop(); + } + public void join() throws InterruptedException { threadPool.join(); @@ -227,6 +239,31 @@ public class SPDYClient return null; } + private boolean sessionOpened(Session session) + { + // Add sessions only if the factory is not stopping + return isRunning() && sessions.offer(session); + } + + private boolean sessionClosed(Session session) + { + // Remove sessions only if the factory is not stopping + // to avoid concurrent removes during iterations + return isRunning() && sessions.remove(session); + } + + private void closeConnections() + { + for (Session session : sessions) + session.goAway(); + sessions.clear(); + } + + protected Collection getSessions() + { + return Collections.unmodifiableCollection(sessions); + } + private class ClientSelectorManager extends SelectorManager { @Override @@ -373,7 +410,9 @@ public class SPDYClient Parser parser = new Parser(compressionFactory.newDecompressor()); Generator generator = new Generator(compressionFactory.newCompressor()); - SPDYAsyncConnection connection = new SPDYAsyncConnection(endPoint, parser); + Factory factory = sessionPromise.client.factory; + + SPDYAsyncConnection connection = new ClientSPDYAsyncConnection(endPoint, parser, factory); endPoint.setConnection(connection); StandardSession session = new StandardSession(sessionPromise.client.version, connection, 1, sessionPromise.listener, generator); @@ -381,7 +420,27 @@ public class SPDYClient sessionPromise.completed(session); connection.setSession(session); + factory.sessionOpened(session); + return connection; } + + private class ClientSPDYAsyncConnection extends SPDYAsyncConnection + { + private final Factory factory; + + public ClientSPDYAsyncConnection(AsyncEndPoint endPoint, Parser parser, Factory factory) + { + super(endPoint, parser); + this.factory = factory; + } + + @Override + public void onClose() + { + super.onClose(); + factory.sessionClosed(getSession()); + } + } } } diff --git a/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java b/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java index c4e73c3e579..3183bc2d669 100644 --- a/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java +++ b/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java @@ -18,9 +18,13 @@ package org.eclipse.jetty.spdy; import java.nio.channels.SocketChannel; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLException; @@ -30,6 +34,7 @@ import org.eclipse.jetty.io.nio.SslConnection; import org.eclipse.jetty.npn.NextProtoNego; import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.spdy.api.SPDY; +import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; import org.eclipse.jetty.util.ssl.SslContextFactory; @@ -37,6 +42,7 @@ public class SPDYServerConnector extends SelectChannelConnector { // Order is important on server side, so we use a LinkedHashMap private final Map factories = new LinkedHashMap<>(); + private final Queue sessions = new ConcurrentLinkedQueue<>(); private final SslContextFactory sslContextFactory; private final AsyncConnectionFactory defaultConnectionFactory; @@ -54,6 +60,13 @@ public class SPDYServerConnector extends SelectChannelConnector putAsyncConnectionFactory("spdy/2", defaultConnectionFactory); } + @Override + protected void doStop() throws Exception + { + closeSessions(); + super.doStop(); + } + public AsyncConnectionFactory getAsyncConnectionFactory(String protocol) { synchronized (factories) @@ -115,7 +128,7 @@ public class SPDYServerConnector extends SelectChannelConnector public void unsupported() { AsyncConnectionFactory connectionFactory = getDefaultAsyncConnectionFactory(); - AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, null); + AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, this); sslEndPoint.setConnection(connection); } @@ -129,7 +142,7 @@ public class SPDYServerConnector extends SelectChannelConnector public void protocolSelected(String protocol) { AsyncConnectionFactory connectionFactory = getAsyncConnectionFactory(protocol); - AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, null); + AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, this); sslEndPoint.setConnection(connection); } }); @@ -144,7 +157,7 @@ public class SPDYServerConnector extends SelectChannelConnector else { AsyncConnectionFactory connectionFactory = getDefaultAsyncConnectionFactory(); - AsyncConnection connection = connectionFactory.newAsyncConnection(channel, endPoint, null); + AsyncConnection connection = connectionFactory.newAsyncConnection(channel, endPoint, this); endPoint.setConnection(connection); return connection; } @@ -170,4 +183,29 @@ public class SPDYServerConnector extends SelectChannelConnector throw new RuntimeException(x); } } + + protected boolean sessionOpened(Session session) + { + // Add sessions only if the connector is not stopping + return isRunning() && sessions.offer(session); + } + + protected boolean sessionClosed(Session session) + { + // Remove sessions only if the connector is not stopping + // to avoid concurrent removes during iterations + return isRunning() && sessions.remove(session); + } + + private void closeSessions() + { + for (Session session : sessions) + session.goAway(); + sessions.clear(); + } + + protected Collection getSessions() + { + return Collections.unmodifiableCollection(sessions); + } } diff --git a/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java b/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java index d0aa59bbb5a..5ca33e36f78 100644 --- a/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java +++ b/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java @@ -53,13 +53,17 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory if (listener == null) listener = newServerSessionFrameListener(endPoint, attachment); - ServerSPDYAsyncConnection connection = new ServerSPDYAsyncConnection(endPoint, parser, listener); + SPDYServerConnector connector = (SPDYServerConnector)attachment; + + SPDYAsyncConnection connection = new ServerSPDYAsyncConnection(endPoint, parser, listener, connector); endPoint.setConnection(connection); final StandardSession session = new StandardSession(version, connection, 2, listener, generator); parser.addListener(session); connection.setSession(session); + connector.sessionOpened(session); + return connection; } @@ -71,12 +75,14 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory private static class ServerSPDYAsyncConnection extends SPDYAsyncConnection { private final ServerSessionFrameListener listener; + private final SPDYServerConnector connector; private volatile boolean connected; - private ServerSPDYAsyncConnection(AsyncEndPoint endPoint, Parser parser, ServerSessionFrameListener listener) + private ServerSPDYAsyncConnection(AsyncEndPoint endPoint, Parser parser, ServerSessionFrameListener listener, SPDYServerConnector connector) { super(endPoint, parser); this.listener = listener; + this.connector = connector; } @Override @@ -91,5 +97,12 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory } return super.handle(); } + + @Override + public void onClose() + { + super.onClose(); + connector.sessionClosed(getSession()); + } } } diff --git a/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SPDYClientFactoryTest.java b/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SPDYClientFactoryTest.java new file mode 100644 index 00000000000..8124d674a8e --- /dev/null +++ b/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SPDYClientFactoryTest.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.eclipse.jetty.spdy; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import junit.framework.Assert; +import org.eclipse.jetty.spdy.api.GoAwayInfo; +import org.eclipse.jetty.spdy.api.Session; +import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; +import org.junit.Test; + +public class SPDYClientFactoryTest extends AbstractTest +{ + @Test + public void testStoppingClientFactorySendsGoAway() throws Exception + { + final CountDownLatch latch = new CountDownLatch(1); + startClient(startServer(new ServerSessionFrameListener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayInfo goAwayInfo) + { + latch.countDown(); + } + }), null); + + // Sleep a while to avoid the factory is + // stopped before a session can be opened + TimeUnit.SECONDS.sleep(1); + + clientFactory.stop(); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(clientFactory.getSessions().isEmpty()); + } + + @Test + public void testSessionClosedIsRemovedFromClientFactory() throws Exception + { + Session session = startClient(startServer(null), null); + + session.goAway().get(5, TimeUnit.SECONDS); + + // Sleep a while to allow the factory to remove the session + // since it is done asynchronously by the selector thread + TimeUnit.SECONDS.sleep(1); + + Assert.assertTrue(clientFactory.getSessions().isEmpty()); + } +} diff --git a/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SPDYServerConnectorTest.java b/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SPDYServerConnectorTest.java new file mode 100644 index 00000000000..d168b11d783 --- /dev/null +++ b/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SPDYServerConnectorTest.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.eclipse.jetty.spdy; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import junit.framework.Assert; +import org.eclipse.jetty.spdy.api.GoAwayInfo; +import org.eclipse.jetty.spdy.api.Session; +import org.eclipse.jetty.spdy.api.SessionFrameListener; +import org.junit.Test; + +public class SPDYServerConnectorTest extends AbstractTest +{ + @Test + public void testStoppingServerConnectorSendsGoAway() throws Exception + { + final CountDownLatch latch = new CountDownLatch(1); + startClient(startServer(null), new SessionFrameListener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayInfo goAwayInfo) + { + latch.countDown(); + } + }); + + // Sleep a while to avoid the connector is + // stopped before a session can be opened + TimeUnit.SECONDS.sleep(1); + + connector.stop(); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(connector.getSessions().isEmpty()); + } + + @Test + public void testSessionClosedIsRemovedFromServerConnector() throws Exception + { + Session session = startClient(startServer(null), null); + + session.goAway().get(5, TimeUnit.SECONDS); + + // Sleep a while to allow the connector to remove the session + // since it is done asynchronously by the selector thread + TimeUnit.SECONDS.sleep(1); + + Assert.assertTrue(connector.getSessions().isEmpty()); + } +}