From 980d7285028f8f59036afc7937eec6acafb110e0 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 28 Feb 2012 15:32:51 +0100 Subject: [PATCH] Implemented asynchronous notifications of frame listener callbacks. --- .../java/org/eclipse/jetty/spdy/ISession.java | 2 + .../java/org/eclipse/jetty/spdy/IStream.java | 2 + .../eclipse/jetty/spdy/StandardSession.java | 210 ++++++++++++------ .../eclipse/jetty/spdy/StandardStream.java | 51 ++++- .../org/eclipse/jetty/spdy/api/RstInfo.java | 6 + .../org/eclipse/jetty/spdy/api/Settings.java | 10 +- .../spdy/generator/SettingsGenerator.java | 6 +- .../eclipse/jetty/spdy/AsyncTimeoutTest.java | 7 +- .../jetty/spdy/api/ClientUsageTest.java | 8 +- .../spdy/http/HTTPSPDYServerConnector.java | 14 +- .../ServerHTTPSPDYAsyncConnectionFactory.java | 5 +- .../jetty/spdy/http/AbstractHTTPSPDYTest.java | 7 +- .../org/eclipse/jetty/spdy/SPDYClient.java | 26 ++- .../jetty/spdy/SPDYServerConnector.java | 31 ++- .../ServerSPDYAsyncConnectionFactory.java | 11 +- .../org/eclipse/jetty/spdy/AbstractTest.java | 5 +- .../eclipse/jetty/spdy/ConcurrentTest.java | 100 +++++++++ .../eclipse/jetty/spdy/SSLSynReplyTest.java | 5 +- .../org/eclipse/jetty/spdy/SettingsTest.java | 12 +- 19 files changed, 397 insertions(+), 121 deletions(-) create mode 100644 spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ConcurrentTest.java diff --git a/spdy-core/src/main/java/org/eclipse/jetty/spdy/ISession.java b/spdy-core/src/main/java/org/eclipse/jetty/spdy/ISession.java index 972d1440f4a..0ea7991bdb1 100644 --- a/spdy-core/src/main/java/org/eclipse/jetty/spdy/ISession.java +++ b/spdy-core/src/main/java/org/eclipse/jetty/spdy/ISession.java @@ -30,6 +30,8 @@ public interface ISession extends Session public void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Handler handler, C context); + public void execute(Runnable task); + public int getWindowSize(); public interface Controller diff --git a/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java b/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java index 12952d42010..94cb6cf1090 100644 --- a/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java +++ b/spdy-core/src/main/java/org/eclipse/jetty/spdy/IStream.java @@ -36,4 +36,6 @@ public interface IStream extends Stream public void handle(ControlFrame frame); public void handle(DataFrame dataFrame, ByteBuffer data); + + public void post(Runnable task); } diff --git a/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java b/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java index cabf4996e22..4ea0050e598 100644 --- a/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java +++ b/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -69,6 +70,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler listeners = new CopyOnWriteArrayList<>(); private final ConcurrentMap streams = new ConcurrentHashMap<>(); private final Deque queue = new LinkedList<>(); + private final Executor threadPool; private final ScheduledExecutorService scheduler; private final short version; private final Controller controller; @@ -82,9 +84,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler controller, int initialStreamId, SessionFrameListener listener, Generator generator) + public StandardSession(short version, Executor threadPool, ScheduledExecutorService scheduler, Controller controller, int initialStreamId, SessionFrameListener listener, Generator generator) { this.version = version; + this.threadPool = threadPool; this.scheduler = scheduler; this.controller = controller; this.streamIds = new AtomicInteger(initialStreamId); @@ -353,8 +356,6 @@ public class StandardSession implements ISession, Parser.Listener, Handler void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Handler handler, C context) throws StreamException { if (stream != null) - updateLastStreamId(stream); + updateLastStreamId(stream); // TODO: not sure this is right ByteBuffer buffer = generator.control(frame); logger.debug("Queuing {} on {}", frame, stream); ControlFrameBytes frameBytes = new ControlFrameBytes<>(frame, buffer, handler, context); @@ -684,6 +753,12 @@ public class StandardSession implements ISession, Parser.Listener, Handler attributes = new ConcurrentHashMap<>(); + private final Queue queue = new LinkedList<>(); private final ISession session; private final SynStreamFrame frame; private final AtomicInteger windowSize; @@ -53,6 +56,7 @@ public class StandardStream implements IStream private volatile boolean opened; private volatile boolean halfClosed; private volatile boolean closed; + private boolean dispatched; public StandardStream(ISession session, SynStreamFrame frame) { @@ -165,11 +169,17 @@ public class StandardStream implements IStream updateWindowSize(windowUpdate.getWindowDelta()); break; } + case RST_STREAM: + { + // TODO: + break; + } default: { throw new IllegalStateException(); } } + session.flush(); } @Override @@ -191,6 +201,46 @@ public class StandardStream implements IStream // the application listeners because they may block windowUpdate(length); } + session.flush(); + } + + @Override + public void post(Runnable task) + { + synchronized (queue) + { + logger.debug("Posting task {}", task); + queue.offer(task); + dispatch(); + } + } + + private void dispatch() + { + synchronized (queue) + { + if (dispatched) + return; + + final Runnable task = queue.poll(); + if (task != null) + { + dispatched = true; + logger.debug("Dispatching task {}", task); + session.execute(new Runnable() + { + @Override + public void run() + { + logger.debug("Executing task {}", task); + task.run(); + logger.debug("Completing task {}", task); + dispatched = false; + dispatch(); + } + }); + } + } } private void windowUpdate(int delta) @@ -280,7 +330,6 @@ public class StandardStream implements IStream updateCloseState(replyInfo.isClose()); SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders()); session.control(this, frame, timeout, unit, handler, null); - session.flush(); } catch (StreamException x) { diff --git a/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/RstInfo.java b/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/RstInfo.java index a04ae94add6..f1f5ee482a0 100644 --- a/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/RstInfo.java +++ b/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/RstInfo.java @@ -51,4 +51,10 @@ public class RstInfo { return streamStatus; } + + @Override + public String toString() + { + return String.format("RST stream=%d %s", streamId, streamStatus); + } } diff --git a/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Settings.java b/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Settings.java index 847162fe8d7..8f69a5021de 100644 --- a/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Settings.java +++ b/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Settings.java @@ -43,7 +43,7 @@ public class Settings implements Iterable public void put(Setting setting) { - settings.put(setting.getId(), setting); + settings.put(setting.id(), setting); } public Setting remove(ID id) @@ -172,17 +172,17 @@ public class Settings implements Iterable this.value = value; } - public ID getId() + public ID id() { return id; } - public Flag getFlag() + public Flag flag() { return flag; } - public int getValue() + public int value() { return value; } @@ -210,7 +210,7 @@ public class Settings implements Iterable @Override public String toString() { - return String.format("[id=%s,flags=%s:value=%d]", getId(), getFlag(), getValue()); + return String.format("[id=%s,flags=%s:value=%d]", id(), flag(), value()); } } } diff --git a/spdy-core/src/main/java/org/eclipse/jetty/spdy/generator/SettingsGenerator.java b/spdy-core/src/main/java/org/eclipse/jetty/spdy/generator/SettingsGenerator.java index 935223ba2be..314d9ae685c 100644 --- a/spdy-core/src/main/java/org/eclipse/jetty/spdy/generator/SettingsGenerator.java +++ b/spdy-core/src/main/java/org/eclipse/jetty/spdy/generator/SettingsGenerator.java @@ -42,12 +42,12 @@ public class SettingsGenerator extends ControlFrameGenerator for (Settings.Setting setting : settings) { - int id = setting.getId().getCode(); - int flags = setting.getFlag().getCode(); + int id = setting.id().getCode(); + int flags = setting.flag().getCode(); int idAndFlags = (id << 8) + flags; idAndFlags = convertIdAndFlags(frame.getVersion(), idAndFlags); buffer.putInt(idAndFlags); - buffer.putInt(setting.getValue()); + buffer.putInt(setting.value()); } buffer.flip(); diff --git a/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java b/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java index 73f5b664cb9..c2ed58ecee0 100644 --- a/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java +++ b/spdy-core/src/test/java/org/eclipse/jetty/spdy/AsyncTimeoutTest.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy; import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -42,9 +43,10 @@ public class AsyncTimeoutTest final long timeout = 1000; final TimeUnit unit = TimeUnit.MILLISECONDS; + Executor threadPool = Executors.newCachedThreadPool(); ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); Generator generator = new Generator(new StandardCompressionFactory.StandardCompressor()); - Session session = new StandardSession(SPDY.V2, scheduler, new TestController(), 1, null, generator) + Session session = new StandardSession(SPDY.V2, threadPool, scheduler, new TestController(), 1, null, generator) { @Override public void flush() @@ -85,9 +87,10 @@ public class AsyncTimeoutTest final long timeout = 1000; final TimeUnit unit = TimeUnit.MILLISECONDS; + Executor threadPool = Executors.newCachedThreadPool(); ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); Generator generator = new Generator(new StandardCompressionFactory.StandardCompressor()); - Session session = new StandardSession(SPDY.V2, scheduler, new TestController(), 1, null, generator) + Session session = new StandardSession(SPDY.V2, threadPool, scheduler, new TestController(), 1, null, generator) { private final AtomicInteger flushes = new AtomicInteger(); diff --git a/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ClientUsageTest.java b/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ClientUsageTest.java index bfc3b780019..6f666e02d37 100644 --- a/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ClientUsageTest.java +++ b/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ClientUsageTest.java @@ -29,7 +29,7 @@ public class ClientUsageTest @Test public void testClientRequestResponseNoBody() throws Exception { - Session session = new StandardSession(SPDY.V2, null, null, 1, null, null); + Session session = new StandardSession(SPDY.V2, null, null, null, 1, null, null); session.syn(new SynInfo(true), new StreamFrameListener.Adapter() { @@ -48,7 +48,7 @@ public class ClientUsageTest @Test public void testClientRequestWithBodyResponseNoBody() throws Exception { - Session session = new StandardSession(SPDY.V2, null, null, 1, null, null); + Session session = new StandardSession(SPDY.V2, null, null, null, 1, null, null); Stream stream = session.syn(new SynInfo(false), new StreamFrameListener.Adapter() { @@ -69,7 +69,7 @@ public class ClientUsageTest @Test public void testAsyncClientRequestWithBodyResponseNoBody() throws Exception { - Session session = new StandardSession(SPDY.V2, null, null, 1, null, null); + Session session = new StandardSession(SPDY.V2, null, null, null, 1, null, null); final String context = "context"; session.syn(new SynInfo(false), new StreamFrameListener.Adapter() @@ -104,7 +104,7 @@ public class ClientUsageTest @Test public void testAsyncClientRequestWithBodyAndResponseWithBody() throws Exception { - Session session = new StandardSession(SPDY.V2, null, null, 1, null, null); + Session session = new StandardSession(SPDY.V2, null, null, null, 1, null, null); session.syn(new SynInfo(false), new StreamFrameListener.Adapter() { diff --git a/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYServerConnector.java b/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYServerConnector.java index 86a98b8cb85..07c0c59a610 100644 --- a/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYServerConnector.java +++ b/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/HTTPSPDYServerConnector.java @@ -33,14 +33,20 @@ public class HTTPSPDYServerConnector extends SPDYServerConnector public HTTPSPDYServerConnector(SslContextFactory sslContextFactory) { super(null, sslContextFactory); - // Override the "spdy/2" protocol by handling HTTP over SPDY - putAsyncConnectionFactory("spdy/2", new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getScheduler(), this)); - // Add the "http/1.1" protocol for browsers that do not support NPN - putAsyncConnectionFactory("http/1.1", new ServerHTTPAsyncConnectionFactory(this)); // Override the default connection factory for non-SSL connections defaultConnectionFactory = new ServerHTTPAsyncConnectionFactory(this); } + @Override + protected void doStart() throws Exception + { + super.doStart(); + // Override the "spdy/2" protocol by handling HTTP over SPDY + putAsyncConnectionFactory("spdy/2", new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getExecutor(), getScheduler(), this)); + // Add the "http/1.1" protocol for browsers that do not support NPN + putAsyncConnectionFactory("http/1.1", new ServerHTTPAsyncConnectionFactory(this)); + } + @Override protected AsyncConnectionFactory getDefaultAsyncConnectionFactory() { diff --git a/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnectionFactory.java b/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnectionFactory.java index 426f7dd1b39..38029013461 100644 --- a/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnectionFactory.java +++ b/spdy-jetty-http/src/main/java/org/eclipse/jetty/spdy/http/ServerHTTPSPDYAsyncConnectionFactory.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy.http; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import org.eclipse.jetty.http.HttpException; @@ -42,9 +43,9 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnect private static final Logger logger = LoggerFactory.getLogger(ServerHTTPSPDYAsyncConnectionFactory.class); private final Connector connector; - public ServerHTTPSPDYAsyncConnectionFactory(short version, ScheduledExecutorService scheduler, Connector connector) + public ServerHTTPSPDYAsyncConnectionFactory(short version, Executor threadPool, ScheduledExecutorService scheduler, Connector connector) { - super(version, scheduler); + super(version, threadPool, scheduler); this.connector = connector; } diff --git a/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/AbstractHTTPSPDYTest.java b/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/AbstractHTTPSPDYTest.java index 1461cf30e7e..2ea66cbae8f 100644 --- a/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/AbstractHTTPSPDYTest.java +++ b/spdy-jetty-http/src/test/java/org/eclipse/jetty/spdy/http/AbstractHTTPSPDYTest.java @@ -17,6 +17,7 @@ package org.eclipse.jetty.spdy.http; import java.net.InetSocketAddress; +import java.util.concurrent.Executor; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; @@ -27,7 +28,6 @@ import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.SessionFrameListener; import org.eclipse.jetty.util.thread.QueuedThreadPool; -import org.eclipse.jetty.util.thread.ThreadPool; import org.junit.After; import org.junit.Rule; import org.junit.rules.TestWatchman; @@ -71,7 +71,7 @@ public abstract class AbstractHTTPSPDYTest @Override protected AsyncConnectionFactory getDefaultAsyncConnectionFactory() { - return new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getScheduler(), this); + return new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getExecutor(), getScheduler(), this); } }; } @@ -88,7 +88,7 @@ public abstract class AbstractHTTPSPDYTest return clientFactory.newSPDYClient(SPDY.V2).connect(socketAddress, listener).get(); } - protected SPDYClient.Factory newSPDYClientFactory(ThreadPool threadPool) + protected SPDYClient.Factory newSPDYClientFactory(Executor threadPool) { return new SPDYClient.Factory(threadPool); } @@ -99,7 +99,6 @@ public abstract class AbstractHTTPSPDYTest if (clientFactory != null) { clientFactory.stop(); - clientFactory.join(); } if (server != null) { diff --git a/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java b/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java index 5e81bf682c9..fffbf2d4300 100644 --- a/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java +++ b/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYClient.java @@ -33,8 +33,10 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLException; @@ -54,7 +56,6 @@ import org.eclipse.jetty.spdy.parser.Parser; import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; -import org.eclipse.jetty.util.thread.ThreadPool; public class SPDYClient { @@ -176,7 +177,7 @@ public class SPDYClient private final Map factories = new ConcurrentHashMap<>(); private final Queue sessions = new ConcurrentLinkedQueue<>(); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); - private final ThreadPool threadPool; + private final Executor threadPool; private final SslContextFactory sslContextFactory; private final SelectorManager selector; @@ -190,12 +191,12 @@ public class SPDYClient this(null, sslContextFactory); } - public Factory(ThreadPool threadPool) + public Factory(Executor threadPool) { this(threadPool, null); } - public Factory(ThreadPool threadPool, SslContextFactory sslContextFactory) + public Factory(Executor threadPool, SslContextFactory sslContextFactory) { if (threadPool == null) threadPool = new QueuedThreadPool(); @@ -224,11 +225,6 @@ public class SPDYClient super.doStop(); } - public void join() throws InterruptedException - { - threadPool.join(); - } - protected String selectProtocol(List serverProtocols) { for (String serverProtocol : serverProtocols) @@ -272,7 +268,15 @@ public class SPDYClient @Override public boolean dispatch(Runnable task) { - return threadPool.dispatch(task); + try + { + threadPool.execute(task); + return true; + } + catch (RejectedExecutionException x) + { + return false; + } } @Override @@ -418,7 +422,7 @@ public class SPDYClient SPDYAsyncConnection connection = new ClientSPDYAsyncConnection(endPoint, parser, factory); endPoint.setConnection(connection); - StandardSession session = new StandardSession(sessionPromise.client.version, factory.scheduler, connection, 1, sessionPromise.listener, generator); + StandardSession session = new StandardSession(sessionPromise.client.version, factory.threadPool, factory.scheduler, connection, 1, sessionPromise.listener, generator); parser.addListener(session); sessionPromise.completed(session); connection.setSession(session); diff --git a/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java b/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java index b60fb8f62d9..ccd9549d896 100644 --- a/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java +++ b/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/SPDYServerConnector.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -40,6 +41,7 @@ import org.eclipse.jetty.spdy.api.SPDY; import org.eclipse.jetty.spdy.api.Session; import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.thread.ThreadPool; public class SPDYServerConnector extends SelectChannelConnector { @@ -47,8 +49,9 @@ public class SPDYServerConnector extends SelectChannelConnector private final Map factories = new LinkedHashMap<>(); private final Queue sessions = new ConcurrentLinkedQueue<>(); private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final ServerSessionFrameListener listener; private final SslContextFactory sslContextFactory; - private final AsyncConnectionFactory defaultConnectionFactory; + private AsyncConnectionFactory defaultConnectionFactory; public SPDYServerConnector(ServerSessionFrameListener listener) { @@ -57,11 +60,25 @@ public class SPDYServerConnector extends SelectChannelConnector public SPDYServerConnector(ServerSessionFrameListener listener, SslContextFactory sslContextFactory) { + this.listener = listener; this.sslContextFactory = sslContextFactory; if (sslContextFactory != null) addBean(sslContextFactory); - defaultConnectionFactory = new ServerSPDYAsyncConnectionFactory(SPDY.V2, scheduler, listener); - putAsyncConnectionFactory("spdy/2", defaultConnectionFactory); + } + + protected Executor getExecutor() + { + final ThreadPool threadPool = getThreadPool(); + if (threadPool instanceof Executor) + return (Executor)threadPool; + return new Executor() + { + @Override + public void execute(Runnable command) + { + threadPool.dispatch(command); + } + }; } protected ScheduledExecutorService getScheduler() @@ -69,6 +86,14 @@ public class SPDYServerConnector extends SelectChannelConnector return scheduler; } + @Override + protected void doStart() throws Exception + { + super.doStart(); + defaultConnectionFactory = new ServerSPDYAsyncConnectionFactory(SPDY.V2, getExecutor(), scheduler, listener); + putAsyncConnectionFactory("spdy/2", defaultConnectionFactory); + } + @Override protected void doStop() throws Exception { diff --git a/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java b/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java index 3048e869388..ec107dddcdd 100644 --- a/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java +++ b/spdy-jetty/src/main/java/org/eclipse/jetty/spdy/ServerSPDYAsyncConnectionFactory.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy; import java.io.IOException; import java.nio.channels.SocketChannel; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import org.eclipse.jetty.io.AsyncEndPoint; @@ -29,18 +30,20 @@ import org.eclipse.jetty.spdy.parser.Parser; public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory { + private final Executor threadPool; private final ScheduledExecutorService scheduler; private final short version; private final ServerSessionFrameListener listener; - public ServerSPDYAsyncConnectionFactory(short version, ScheduledExecutorService scheduler) + public ServerSPDYAsyncConnectionFactory(short version, Executor threadPool, ScheduledExecutorService scheduler) { - this(version, scheduler, null); + this(version, threadPool, scheduler, null); } - public ServerSPDYAsyncConnectionFactory(short version, ScheduledExecutorService scheduler, ServerSessionFrameListener listener) + public ServerSPDYAsyncConnectionFactory(short version, Executor threadPool, ScheduledExecutorService scheduler, ServerSessionFrameListener listener) { this.version = version; + this.threadPool = threadPool; this.scheduler = scheduler; this.listener = listener; } @@ -61,7 +64,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory SPDYAsyncConnection connection = new ServerSPDYAsyncConnection(endPoint, parser, listener, connector); endPoint.setConnection(connection); - final StandardSession session = new StandardSession(version, scheduler, connection, 2, listener, generator); + final StandardSession session = new StandardSession(version, threadPool, scheduler, connection, 2, listener, generator); parser.addListener(session); connection.setSession(session); diff --git a/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/AbstractTest.java b/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/AbstractTest.java index 25b48f495ec..8e869e0c7c1 100644 --- a/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/AbstractTest.java +++ b/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/AbstractTest.java @@ -17,6 +17,7 @@ package org.eclipse.jetty.spdy; import java.net.InetSocketAddress; +import java.util.concurrent.Executor; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.spdy.api.SPDY; @@ -25,7 +26,6 @@ import org.eclipse.jetty.spdy.api.SessionFrameListener; import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; -import org.eclipse.jetty.util.thread.ThreadPool; import org.junit.After; import org.junit.Rule; import org.junit.rules.TestWatchman; @@ -77,7 +77,7 @@ public abstract class AbstractTest return clientFactory.newSPDYClient(SPDY.V2).connect(socketAddress, listener).get(); } - protected SPDYClient.Factory newSPDYClientFactory(ThreadPool threadPool) + protected SPDYClient.Factory newSPDYClientFactory(Executor threadPool) { return new SPDYClient.Factory(threadPool); } @@ -100,7 +100,6 @@ public abstract class AbstractTest if (clientFactory != null) { clientFactory.stop(); - clientFactory.join(); } if (server != null) { diff --git a/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ConcurrentTest.java b/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ConcurrentTest.java new file mode 100644 index 00000000000..a2c22be140e --- /dev/null +++ b/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/ConcurrentTest.java @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2012 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.eclipse.jetty.spdy; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.spdy.api.Headers; +import org.eclipse.jetty.spdy.api.ReplyInfo; +import org.eclipse.jetty.spdy.api.SPDYException; +import org.eclipse.jetty.spdy.api.Session; +import org.eclipse.jetty.spdy.api.Stream; +import org.eclipse.jetty.spdy.api.StreamFrameListener; +import org.eclipse.jetty.spdy.api.SynInfo; +import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; +import org.junit.Assert; +import org.junit.Test; + +public class ConcurrentTest extends AbstractTest +{ + @Test + public void testConcurrentSyn() throws Exception + { + final CountDownLatch slowServerLatch = new CountDownLatch(1); + final CountDownLatch fastServerLatch = new CountDownLatch(1); + Session session = startClient(startServer(new ServerSessionFrameListener.Adapter() + { + @Override + public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) + { + try + { + Headers headers = synInfo.getHeaders(); + String url = headers.get("url").value(); + switch (url) + { + case "/slow": + Assert.assertTrue(fastServerLatch.await(10, TimeUnit.SECONDS)); + slowServerLatch.countDown(); + break; + case "/fast": + fastServerLatch.countDown(); + break; + default: + Assert.fail(); + } + stream.reply(new ReplyInfo(true)); + return null; + } + catch (InterruptedException x) + { + throw new SPDYException(x); + } + } + }), null); + + final CountDownLatch slowClientLatch = new CountDownLatch(1); + Headers headers1 = new Headers(); + headers1.put("url", "/slow"); + session.syn(new SynInfo(headers1, true), new StreamFrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + slowClientLatch.countDown(); + } + }); + + final CountDownLatch fastClientLatch = new CountDownLatch(1); + Headers headers2 = new Headers(); + headers2.put("url", "/fast"); + session.syn(new SynInfo(headers2, true), new StreamFrameListener.Adapter() + { + @Override + public void onReply(Stream stream, ReplyInfo replyInfo) + { + fastClientLatch.countDown(); + } + }); + + Assert.assertTrue(fastServerLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(fastClientLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(slowServerLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(slowClientLatch.await(5, TimeUnit.SECONDS)); + } +} diff --git a/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SSLSynReplyTest.java b/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SSLSynReplyTest.java index 9ddc3796edf..ee8ecdf70c4 100644 --- a/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SSLSynReplyTest.java +++ b/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SSLSynReplyTest.java @@ -16,10 +16,11 @@ package org.eclipse.jetty.spdy; +import java.util.concurrent.Executor; + import org.eclipse.jetty.npn.NextProtoNego; import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.eclipse.jetty.util.thread.ThreadPool; import org.junit.Before; public class SSLSynReplyTest extends SynReplyTest @@ -32,7 +33,7 @@ public class SSLSynReplyTest extends SynReplyTest } @Override - protected SPDYClient.Factory newSPDYClientFactory(ThreadPool threadPool) + protected SPDYClient.Factory newSPDYClientFactory(Executor threadPool) { SslContextFactory sslContextFactory = newSslContextFactory(); return new SPDYClient.Factory(threadPool, sslContextFactory); diff --git a/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SettingsTest.java b/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SettingsTest.java index aaf7d782f6a..39fb9e6916b 100644 --- a/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SettingsTest.java +++ b/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SettingsTest.java @@ -39,14 +39,14 @@ public class SettingsTest extends AbstractTest settings.put(new Settings.Setting(Settings.ID.INITIAL_WINDOW_SIZE, windowValue)); Settings.Setting setting1 = settings.get(Settings.ID.MAX_CONCURRENT_STREAMS); - Assert.assertSame(Settings.ID.MAX_CONCURRENT_STREAMS, setting1.getId()); - Assert.assertSame(Settings.Flag.PERSIST, setting1.getFlag()); - Assert.assertEquals(streamsValue, setting1.getValue()); + Assert.assertSame(Settings.ID.MAX_CONCURRENT_STREAMS, setting1.id()); + Assert.assertSame(Settings.Flag.PERSIST, setting1.flag()); + Assert.assertEquals(streamsValue, setting1.value()); Settings.Setting setting2 = settings.get(Settings.ID.INITIAL_WINDOW_SIZE); - Assert.assertSame(Settings.ID.INITIAL_WINDOW_SIZE, setting2.getId()); - Assert.assertSame(Settings.Flag.NONE, setting2.getFlag()); - Assert.assertEquals(windowValue, setting2.getValue()); + Assert.assertSame(Settings.ID.INITIAL_WINDOW_SIZE, setting2.id()); + Assert.assertSame(Settings.Flag.NONE, setting2.flag()); + Assert.assertEquals(windowValue, setting2.value()); } @Test