Sending GOAWAY when stopping SPDYServerConnector and SPDYClient.Factory.

This commit is contained in:
Simone Bordet 2012-02-26 00:54:14 +01:00
parent 6ed95152dd
commit 10d5d4acac
5 changed files with 248 additions and 6 deletions

View File

@ -26,9 +26,13 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
@ -168,6 +172,7 @@ public class SPDYClient
public static class Factory extends AggregateLifeCycle
{
private final Map<String, AsyncConnectionFactory> factories = new ConcurrentHashMap<>();
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
private final ThreadPool threadPool;
private final SslContextFactory sslContextFactory;
private final SelectorManager selector;
@ -209,6 +214,13 @@ public class SPDYClient
return new SPDYClient(version, this);
}
@Override
protected void doStop() throws Exception
{
closeConnections();
super.doStop();
}
public void join() throws InterruptedException
{
threadPool.join();
@ -227,6 +239,31 @@ public class SPDYClient
return null;
}
private boolean sessionOpened(Session session)
{
// Add sessions only if the factory is not stopping
return isRunning() && sessions.offer(session);
}
private boolean sessionClosed(Session session)
{
// Remove sessions only if the factory is not stopping
// to avoid concurrent removes during iterations
return isRunning() && sessions.remove(session);
}
private void closeConnections()
{
for (Session session : sessions)
session.goAway();
sessions.clear();
}
protected Collection<Session> getSessions()
{
return Collections.unmodifiableCollection(sessions);
}
private class ClientSelectorManager extends SelectorManager
{
@Override
@ -373,7 +410,9 @@ public class SPDYClient
Parser parser = new Parser(compressionFactory.newDecompressor());
Generator generator = new Generator(compressionFactory.newCompressor());
SPDYAsyncConnection connection = new SPDYAsyncConnection(endPoint, parser);
Factory factory = sessionPromise.client.factory;
SPDYAsyncConnection connection = new ClientSPDYAsyncConnection(endPoint, parser, factory);
endPoint.setConnection(connection);
StandardSession session = new StandardSession(sessionPromise.client.version, connection, 1, sessionPromise.listener, generator);
@ -381,7 +420,27 @@ public class SPDYClient
sessionPromise.completed(session);
connection.setSession(session);
factory.sessionOpened(session);
return connection;
}
private class ClientSPDYAsyncConnection extends SPDYAsyncConnection
{
private final Factory factory;
public ClientSPDYAsyncConnection(AsyncEndPoint endPoint, Parser parser, Factory factory)
{
super(endPoint, parser);
this.factory = factory;
}
@Override
public void onClose()
{
super.onClose();
factory.sessionClosed(getSession());
}
}
}
}

View File

@ -18,9 +18,13 @@ package org.eclipse.jetty.spdy;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
@ -30,6 +34,7 @@ import org.eclipse.jetty.io.nio.SslConnection;
import org.eclipse.jetty.npn.NextProtoNego;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@ -37,6 +42,7 @@ public class SPDYServerConnector extends SelectChannelConnector
{
// Order is important on server side, so we use a LinkedHashMap
private final Map<String, AsyncConnectionFactory> factories = new LinkedHashMap<>();
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
private final SslContextFactory sslContextFactory;
private final AsyncConnectionFactory defaultConnectionFactory;
@ -54,6 +60,13 @@ public class SPDYServerConnector extends SelectChannelConnector
putAsyncConnectionFactory("spdy/2", defaultConnectionFactory);
}
@Override
protected void doStop() throws Exception
{
closeSessions();
super.doStop();
}
public AsyncConnectionFactory getAsyncConnectionFactory(String protocol)
{
synchronized (factories)
@ -115,7 +128,7 @@ public class SPDYServerConnector extends SelectChannelConnector
public void unsupported()
{
AsyncConnectionFactory connectionFactory = getDefaultAsyncConnectionFactory();
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, null);
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, this);
sslEndPoint.setConnection(connection);
}
@ -129,7 +142,7 @@ public class SPDYServerConnector extends SelectChannelConnector
public void protocolSelected(String protocol)
{
AsyncConnectionFactory connectionFactory = getAsyncConnectionFactory(protocol);
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, null);
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, sslEndPoint, this);
sslEndPoint.setConnection(connection);
}
});
@ -144,7 +157,7 @@ public class SPDYServerConnector extends SelectChannelConnector
else
{
AsyncConnectionFactory connectionFactory = getDefaultAsyncConnectionFactory();
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, endPoint, null);
AsyncConnection connection = connectionFactory.newAsyncConnection(channel, endPoint, this);
endPoint.setConnection(connection);
return connection;
}
@ -170,4 +183,29 @@ public class SPDYServerConnector extends SelectChannelConnector
throw new RuntimeException(x);
}
}
protected boolean sessionOpened(Session session)
{
// Add sessions only if the connector is not stopping
return isRunning() && sessions.offer(session);
}
protected boolean sessionClosed(Session session)
{
// Remove sessions only if the connector is not stopping
// to avoid concurrent removes during iterations
return isRunning() && sessions.remove(session);
}
private void closeSessions()
{
for (Session session : sessions)
session.goAway();
sessions.clear();
}
protected Collection<Session> getSessions()
{
return Collections.unmodifiableCollection(sessions);
}
}

View File

@ -53,13 +53,17 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
if (listener == null)
listener = newServerSessionFrameListener(endPoint, attachment);
ServerSPDYAsyncConnection connection = new ServerSPDYAsyncConnection(endPoint, parser, listener);
SPDYServerConnector connector = (SPDYServerConnector)attachment;
SPDYAsyncConnection connection = new ServerSPDYAsyncConnection(endPoint, parser, listener, connector);
endPoint.setConnection(connection);
final StandardSession session = new StandardSession(version, connection, 2, listener, generator);
parser.addListener(session);
connection.setSession(session);
connector.sessionOpened(session);
return connection;
}
@ -71,12 +75,14 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
private static class ServerSPDYAsyncConnection extends SPDYAsyncConnection
{
private final ServerSessionFrameListener listener;
private final SPDYServerConnector connector;
private volatile boolean connected;
private ServerSPDYAsyncConnection(AsyncEndPoint endPoint, Parser parser, ServerSessionFrameListener listener)
private ServerSPDYAsyncConnection(AsyncEndPoint endPoint, Parser parser, ServerSessionFrameListener listener, SPDYServerConnector connector)
{
super(endPoint, parser);
this.listener = listener;
this.connector = connector;
}
@Override
@ -91,5 +97,12 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
}
return super.handle();
}
@Override
public void onClose()
{
super.onClose();
connector.sessionClosed(getSession());
}
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.junit.Test;
public class SPDYClientFactoryTest extends AbstractTest
{
@Test
public void testStoppingClientFactorySendsGoAway() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
startClient(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayInfo goAwayInfo)
{
latch.countDown();
}
}), null);
// Sleep a while to avoid the factory is
// stopped before a session can be opened
TimeUnit.SECONDS.sleep(1);
clientFactory.stop();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(clientFactory.getSessions().isEmpty());
}
@Test
public void testSessionClosedIsRemovedFromClientFactory() throws Exception
{
Session session = startClient(startServer(null), null);
session.goAway().get(5, TimeUnit.SECONDS);
// Sleep a while to allow the factory to remove the session
// since it is done asynchronously by the selector thread
TimeUnit.SECONDS.sleep(1);
Assert.assertTrue(clientFactory.getSessions().isEmpty());
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.junit.Test;
public class SPDYServerConnectorTest extends AbstractTest
{
@Test
public void testStoppingServerConnectorSendsGoAway() throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
startClient(startServer(null), new SessionFrameListener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayInfo goAwayInfo)
{
latch.countDown();
}
});
// Sleep a while to avoid the connector is
// stopped before a session can be opened
TimeUnit.SECONDS.sleep(1);
connector.stop();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(connector.getSessions().isEmpty());
}
@Test
public void testSessionClosedIsRemovedFromServerConnector() throws Exception
{
Session session = startClient(startServer(null), null);
session.goAway().get(5, TimeUnit.SECONDS);
// Sleep a while to allow the connector to remove the session
// since it is done asynchronously by the selector thread
TimeUnit.SECONDS.sleep(1);
Assert.assertTrue(connector.getSessions().isEmpty());
}
}