Implemented asynchronous notifications of frame listener callbacks.

This commit is contained in:
Simone Bordet 2012-02-28 15:32:51 +01:00
parent 0a276ed909
commit 980d728502
19 changed files with 397 additions and 121 deletions

View File

@ -30,6 +30,8 @@ public interface ISession extends Session
public <C> void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Handler<C> handler, C context);
public void execute(Runnable task);
public int getWindowSize();
public interface Controller<T>

View File

@ -36,4 +36,6 @@ public interface IStream extends Stream
public void handle(ControlFrame frame);
public void handle(DataFrame dataFrame, ByteBuffer data);
public void post(Runnable task);
}

View File

@ -25,6 +25,7 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@ -69,6 +70,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
private final Deque<FrameBytes> queue = new LinkedList<>();
private final Executor threadPool;
private final ScheduledExecutorService scheduler;
private final short version;
private final Controller<FrameBytes> controller;
@ -82,9 +84,10 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private boolean flushing;
private volatile int windowSize = 65536;
public StandardSession(short version, ScheduledExecutorService scheduler, Controller<FrameBytes> controller, int initialStreamId, SessionFrameListener listener, Generator generator)
public StandardSession(short version, Executor threadPool, ScheduledExecutorService scheduler, Controller<FrameBytes> controller, int initialStreamId, SessionFrameListener listener, Generator generator)
{
this.version = version;
this.threadPool = threadPool;
this.scheduler = scheduler;
this.controller = controller;
this.streamIds = new AtomicInteger(initialStreamId);
@ -353,8 +356,6 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
else
{
stream.handle(frame, data);
flush();
if (stream.isClosed())
{
updateLastStreamId(stream);
@ -376,28 +377,34 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
close();
}
private void onSyn(SynStreamFrame synStream)
private void onSyn(final SynStreamFrame frame)
{
IStream stream = new StandardStream(this, synStream);
final IStream stream = new StandardStream(this, frame);
logger.debug("Opening {}", stream);
int streamId = synStream.getStreamId();
Stream existing = streams.putIfAbsent(streamId, stream);
int streamId = frame.getStreamId();
IStream existing = streams.putIfAbsent(streamId, stream);
if (existing != null)
{
logger.debug("Detected duplicate {}, resetting", stream);
rst(new RstInfo(streamId, StreamStatus.PROTOCOL_ERROR));
RstInfo rstInfo = new RstInfo(streamId, StreamStatus.PROTOCOL_ERROR);
logger.debug("Duplicate stream, {}", rstInfo);
rst(rstInfo);
}
else
{
stream.handle(synStream);
StreamFrameListener listener = notifyOnSyn(stream, synStream);
stream.setStreamFrameListener(listener);
flush();
// The onSyn() listener may have sent a frame that closed the stream
if (stream.isClosed())
removeStream(stream);
stream.post(new Runnable()
{
@Override
public void run()
{
stream.handle(frame);
StreamFrameListener listener = notifyOnSyn(stream, frame);
stream.setStreamFrameListener(listener);
flush();
// The onSyn() listener may have sent a frame that closed the stream
if (stream.isClosed())
removeStream(stream);
}
});
}
}
@ -465,46 +472,86 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
}
private void onReply(SynReplyFrame frame)
private void onReply(final SynReplyFrame frame)
{
int streamId = frame.getStreamId();
IStream stream = streams.get(streamId);
stream.handle(frame);
flush();
if (stream.isClosed())
removeStream(stream);
final IStream stream = streams.get(streamId);
if (stream == null)
{
RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
logger.debug("Unknown stream {}", rstInfo);
rst(rstInfo);
}
else
{
stream.post(new Runnable()
{
@Override
public void run()
{
stream.handle(frame);
if (stream.isClosed())
removeStream(stream);
}
});
}
}
private void onRst(RstStreamFrame frame)
private void onRst(final RstStreamFrame frame)
{
// TODO: implement logic to clean up unidirectional streams associated with this stream
notifyOnRst(frame);
int streamId = frame.getStreamId();
IStream stream = streams.get(streamId);
if (stream != null)
removeStream(stream);
final IStream stream = streams.get(streamId);
execute(new Runnable()
{
@Override
public void run()
{
// TODO: implement logic to clean up unidirectional streams associated with this stream
if (stream != null)
stream.handle(frame);
notifyOnRst(frame);
flush();
if (stream != null)
removeStream(stream);
}
});
}
private void onSettings(SettingsFrame frame)
private void onSettings(final SettingsFrame frame)
{
Settings.Setting windowSizeSetting = frame.getSettings().get(Settings.ID.INITIAL_WINDOW_SIZE);
if (windowSizeSetting != null)
this.windowSize = windowSizeSetting.getValue();
notifyOnSettings(frame);
flush();
this.windowSize = windowSizeSetting.value();
execute(new Runnable()
{
@Override
public void run()
{
notifyOnSettings(frame);
flush();
}
});
}
private void onPing(PingFrame frame)
private void onPing(final PingFrame frame)
{
try
{
int pingId = frame.getPingId();
if (pingId % 2 == pingIds.get() % 2)
{
notifyOnPing(frame);
flush();
execute(new Runnable()
{
@Override
public void run()
{
notifyOnPing(frame);
flush();
}
});
}
else
{
@ -517,20 +564,61 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
}
private void onGoAway(GoAwayFrame frame)
private void onGoAway(final GoAwayFrame frame)
{
if (goAwayReceived.compareAndSet(false, true))
{
notifyOnGoAway(frame);
flush();
// SPDY does not require to send back a response to a GO_AWAY.
// We notified the application of the last good stream id,
// tried our best to flush remaining data, and close.
close();
execute(new Runnable()
{
@Override
public void run()
{
notifyOnGoAway(frame);
flush();
// SPDY does not require to send back a response to a GO_AWAY.
// We notified the application of the last good stream id,
// tried our best to flush remaining data, and close.
close();
}
});
}
}
private void onHeaders(final HeadersFrame frame)
{
int streamId = frame.getStreamId();
final IStream stream = streams.get(streamId);
if (stream == null)
{
RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
logger.debug("Unknown stream, {}", rstInfo);
rst(rstInfo);
}
else
{
stream.post(new Runnable()
{
@Override
public void run()
{
stream.handle(frame);
if (stream.isClosed())
removeStream(stream);
}
});
}
}
private void onWindowUpdate(WindowUpdateFrame frame)
{
// TODO: review flow control
int streamId = frame.getStreamId();
IStream stream = streams.get(streamId);
if (stream != null)
stream.handle(frame);
}
protected void close()
{
// Check for null to support tests
@ -538,25 +626,6 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
controller.close(false);
}
private void onHeaders(HeadersFrame frame)
{
int streamId = frame.getStreamId();
IStream stream = streams.get(streamId);
stream.handle(frame);
flush();
if (stream.isClosed())
removeStream(stream);
}
private void onWindowUpdate(WindowUpdateFrame frame)
{
int streamId = frame.getStreamId();
IStream stream = streams.get(streamId);
if (stream != null)
stream.handle(frame);
flush();
}
private StreamFrameListener notifyOnSyn(Stream stream, SynStreamFrame frame)
{
try
@ -647,7 +716,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public <C> void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Handler<C> handler, C context) throws StreamException
{
if (stream != null)
updateLastStreamId(stream);
updateLastStreamId(stream); // TODO: not sure this is right
ByteBuffer buffer = generator.control(frame);
logger.debug("Queuing {} on {}", frame, stream);
ControlFrameBytes<C> frameBytes = new ControlFrameBytes<>(frame, buffer, handler, context);
@ -684,6 +753,12 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
flush();
}
@Override
public void execute(Runnable task)
{
threadPool.execute(task);
}
@Override
public int getWindowSize()
{
@ -889,4 +964,5 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
return String.format("DATA bytes @%x consumed=%b on %s", data.hashCode(), data.isConsumed(), stream);
}
}
}

View File

@ -17,7 +17,9 @@
package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -46,6 +48,7 @@ public class StandardStream implements IStream
{
private static final Logger logger = LoggerFactory.getLogger(Stream.class);
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
private final Queue<Runnable> queue = new LinkedList<>();
private final ISession session;
private final SynStreamFrame frame;
private final AtomicInteger windowSize;
@ -53,6 +56,7 @@ public class StandardStream implements IStream
private volatile boolean opened;
private volatile boolean halfClosed;
private volatile boolean closed;
private boolean dispatched;
public StandardStream(ISession session, SynStreamFrame frame)
{
@ -165,11 +169,17 @@ public class StandardStream implements IStream
updateWindowSize(windowUpdate.getWindowDelta());
break;
}
case RST_STREAM:
{
// TODO:
break;
}
default:
{
throw new IllegalStateException();
}
}
session.flush();
}
@Override
@ -191,6 +201,46 @@ public class StandardStream implements IStream
// the application listeners because they may block
windowUpdate(length);
}
session.flush();
}
@Override
public void post(Runnable task)
{
synchronized (queue)
{
logger.debug("Posting task {}", task);
queue.offer(task);
dispatch();
}
}
private void dispatch()
{
synchronized (queue)
{
if (dispatched)
return;
final Runnable task = queue.poll();
if (task != null)
{
dispatched = true;
logger.debug("Dispatching task {}", task);
session.execute(new Runnable()
{
@Override
public void run()
{
logger.debug("Executing task {}", task);
task.run();
logger.debug("Completing task {}", task);
dispatched = false;
dispatch();
}
});
}
}
}
private void windowUpdate(int delta)
@ -280,7 +330,6 @@ public class StandardStream implements IStream
updateCloseState(replyInfo.isClose());
SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
session.control(this, frame, timeout, unit, handler, null);
session.flush();
}
catch (StreamException x)
{

View File

@ -51,4 +51,10 @@ public class RstInfo
{
return streamStatus;
}
@Override
public String toString()
{
return String.format("RST stream=%d %s", streamId, streamStatus);
}
}

View File

@ -43,7 +43,7 @@ public class Settings implements Iterable<Settings.Setting>
public void put(Setting setting)
{
settings.put(setting.getId(), setting);
settings.put(setting.id(), setting);
}
public Setting remove(ID id)
@ -172,17 +172,17 @@ public class Settings implements Iterable<Settings.Setting>
this.value = value;
}
public ID getId()
public ID id()
{
return id;
}
public Flag getFlag()
public Flag flag()
{
return flag;
}
public int getValue()
public int value()
{
return value;
}
@ -210,7 +210,7 @@ public class Settings implements Iterable<Settings.Setting>
@Override
public String toString()
{
return String.format("[id=%s,flags=%s:value=%d]", getId(), getFlag(), getValue());
return String.format("[id=%s,flags=%s:value=%d]", id(), flag(), value());
}
}
}

View File

@ -42,12 +42,12 @@ public class SettingsGenerator extends ControlFrameGenerator
for (Settings.Setting setting : settings)
{
int id = setting.getId().getCode();
int flags = setting.getFlag().getCode();
int id = setting.id().getCode();
int flags = setting.flag().getCode();
int idAndFlags = (id << 8) + flags;
idAndFlags = convertIdAndFlags(frame.getVersion(), idAndFlags);
buffer.putInt(idAndFlags);
buffer.putInt(setting.getValue());
buffer.putInt(setting.value());
}
buffer.flip();

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -42,9 +43,10 @@ public class AsyncTimeoutTest
final long timeout = 1000;
final TimeUnit unit = TimeUnit.MILLISECONDS;
Executor threadPool = Executors.newCachedThreadPool();
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
Generator generator = new Generator(new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, scheduler, new TestController(), 1, null, generator)
Session session = new StandardSession(SPDY.V2, threadPool, scheduler, new TestController(), 1, null, generator)
{
@Override
public void flush()
@ -85,9 +87,10 @@ public class AsyncTimeoutTest
final long timeout = 1000;
final TimeUnit unit = TimeUnit.MILLISECONDS;
Executor threadPool = Executors.newCachedThreadPool();
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
Generator generator = new Generator(new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(SPDY.V2, scheduler, new TestController(), 1, null, generator)
Session session = new StandardSession(SPDY.V2, threadPool, scheduler, new TestController(), 1, null, generator)
{
private final AtomicInteger flushes = new AtomicInteger();

View File

@ -29,7 +29,7 @@ public class ClientUsageTest
@Test
public void testClientRequestResponseNoBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, null, 1, null, null);
Session session = new StandardSession(SPDY.V2, null, null, null, 1, null, null);
session.syn(new SynInfo(true), new StreamFrameListener.Adapter()
{
@ -48,7 +48,7 @@ public class ClientUsageTest
@Test
public void testClientRequestWithBodyResponseNoBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, null, 1, null, null);
Session session = new StandardSession(SPDY.V2, null, null, null, 1, null, null);
Stream stream = session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
{
@ -69,7 +69,7 @@ public class ClientUsageTest
@Test
public void testAsyncClientRequestWithBodyResponseNoBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, null, 1, null, null);
Session session = new StandardSession(SPDY.V2, null, null, null, 1, null, null);
final String context = "context";
session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
@ -104,7 +104,7 @@ public class ClientUsageTest
@Test
public void testAsyncClientRequestWithBodyAndResponseWithBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, null, 1, null, null);
Session session = new StandardSession(SPDY.V2, null, null, null, 1, null, null);
session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
{

View File

@ -33,14 +33,20 @@ public class HTTPSPDYServerConnector extends SPDYServerConnector
public HTTPSPDYServerConnector(SslContextFactory sslContextFactory)
{
super(null, sslContextFactory);
// Override the "spdy/2" protocol by handling HTTP over SPDY
putAsyncConnectionFactory("spdy/2", new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getScheduler(), this));
// Add the "http/1.1" protocol for browsers that do not support NPN
putAsyncConnectionFactory("http/1.1", new ServerHTTPAsyncConnectionFactory(this));
// Override the default connection factory for non-SSL connections
defaultConnectionFactory = new ServerHTTPAsyncConnectionFactory(this);
}
@Override
protected void doStart() throws Exception
{
super.doStart();
// Override the "spdy/2" protocol by handling HTTP over SPDY
putAsyncConnectionFactory("spdy/2", new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getExecutor(), getScheduler(), this));
// Add the "http/1.1" protocol for browsers that do not support NPN
putAsyncConnectionFactory("http/1.1", new ServerHTTPAsyncConnectionFactory(this));
}
@Override
protected AsyncConnectionFactory getDefaultAsyncConnectionFactory()
{

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.http.HttpException;
@ -42,9 +43,9 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnect
private static final Logger logger = LoggerFactory.getLogger(ServerHTTPSPDYAsyncConnectionFactory.class);
private final Connector connector;
public ServerHTTPSPDYAsyncConnectionFactory(short version, ScheduledExecutorService scheduler, Connector connector)
public ServerHTTPSPDYAsyncConnectionFactory(short version, Executor threadPool, ScheduledExecutorService scheduler, Connector connector)
{
super(version, scheduler);
super(version, threadPool, scheduler);
this.connector = connector;
}

View File

@ -17,6 +17,7 @@
package org.eclipse.jetty.spdy.http;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
@ -27,7 +28,6 @@ import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.junit.After;
import org.junit.Rule;
import org.junit.rules.TestWatchman;
@ -71,7 +71,7 @@ public abstract class AbstractHTTPSPDYTest
@Override
protected AsyncConnectionFactory getDefaultAsyncConnectionFactory()
{
return new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getScheduler(), this);
return new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, getExecutor(), getScheduler(), this);
}
};
}
@ -88,7 +88,7 @@ public abstract class AbstractHTTPSPDYTest
return clientFactory.newSPDYClient(SPDY.V2).connect(socketAddress, listener).get();
}
protected SPDYClient.Factory newSPDYClientFactory(ThreadPool threadPool)
protected SPDYClient.Factory newSPDYClientFactory(Executor threadPool)
{
return new SPDYClient.Factory(threadPool);
}
@ -99,7 +99,6 @@ public abstract class AbstractHTTPSPDYTest
if (clientFactory != null)
{
clientFactory.stop();
clientFactory.join();
}
if (server != null)
{

View File

@ -33,8 +33,10 @@ import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
@ -54,7 +56,6 @@ import org.eclipse.jetty.spdy.parser.Parser;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;
public class SPDYClient
{
@ -176,7 +177,7 @@ public class SPDYClient
private final Map<String, AsyncConnectionFactory> factories = new ConcurrentHashMap<>();
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final ThreadPool threadPool;
private final Executor threadPool;
private final SslContextFactory sslContextFactory;
private final SelectorManager selector;
@ -190,12 +191,12 @@ public class SPDYClient
this(null, sslContextFactory);
}
public Factory(ThreadPool threadPool)
public Factory(Executor threadPool)
{
this(threadPool, null);
}
public Factory(ThreadPool threadPool, SslContextFactory sslContextFactory)
public Factory(Executor threadPool, SslContextFactory sslContextFactory)
{
if (threadPool == null)
threadPool = new QueuedThreadPool();
@ -224,11 +225,6 @@ public class SPDYClient
super.doStop();
}
public void join() throws InterruptedException
{
threadPool.join();
}
protected String selectProtocol(List<String> serverProtocols)
{
for (String serverProtocol : serverProtocols)
@ -272,7 +268,15 @@ public class SPDYClient
@Override
public boolean dispatch(Runnable task)
{
return threadPool.dispatch(task);
try
{
threadPool.execute(task);
return true;
}
catch (RejectedExecutionException x)
{
return false;
}
}
@Override
@ -418,7 +422,7 @@ public class SPDYClient
SPDYAsyncConnection connection = new ClientSPDYAsyncConnection(endPoint, parser, factory);
endPoint.setConnection(connection);
StandardSession session = new StandardSession(sessionPromise.client.version, factory.scheduler, connection, 1, sessionPromise.listener, generator);
StandardSession session = new StandardSession(sessionPromise.client.version, factory.threadPool, factory.scheduler, connection, 1, sessionPromise.listener, generator);
parser.addListener(session);
sessionPromise.completed(session);
connection.setSession(session);

View File

@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -40,6 +41,7 @@ import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.ThreadPool;
public class SPDYServerConnector extends SelectChannelConnector
{
@ -47,8 +49,9 @@ public class SPDYServerConnector extends SelectChannelConnector
private final Map<String, AsyncConnectionFactory> factories = new LinkedHashMap<>();
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final ServerSessionFrameListener listener;
private final SslContextFactory sslContextFactory;
private final AsyncConnectionFactory defaultConnectionFactory;
private AsyncConnectionFactory defaultConnectionFactory;
public SPDYServerConnector(ServerSessionFrameListener listener)
{
@ -57,11 +60,25 @@ public class SPDYServerConnector extends SelectChannelConnector
public SPDYServerConnector(ServerSessionFrameListener listener, SslContextFactory sslContextFactory)
{
this.listener = listener;
this.sslContextFactory = sslContextFactory;
if (sslContextFactory != null)
addBean(sslContextFactory);
defaultConnectionFactory = new ServerSPDYAsyncConnectionFactory(SPDY.V2, scheduler, listener);
putAsyncConnectionFactory("spdy/2", defaultConnectionFactory);
}
protected Executor getExecutor()
{
final ThreadPool threadPool = getThreadPool();
if (threadPool instanceof Executor)
return (Executor)threadPool;
return new Executor()
{
@Override
public void execute(Runnable command)
{
threadPool.dispatch(command);
}
};
}
protected ScheduledExecutorService getScheduler()
@ -69,6 +86,14 @@ public class SPDYServerConnector extends SelectChannelConnector
return scheduler;
}
@Override
protected void doStart() throws Exception
{
super.doStart();
defaultConnectionFactory = new ServerSPDYAsyncConnectionFactory(SPDY.V2, getExecutor(), scheduler, listener);
putAsyncConnectionFactory("spdy/2", defaultConnectionFactory);
}
@Override
protected void doStop() throws Exception
{

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy;
import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.io.AsyncEndPoint;
@ -29,18 +30,20 @@ import org.eclipse.jetty.spdy.parser.Parser;
public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
{
private final Executor threadPool;
private final ScheduledExecutorService scheduler;
private final short version;
private final ServerSessionFrameListener listener;
public ServerSPDYAsyncConnectionFactory(short version, ScheduledExecutorService scheduler)
public ServerSPDYAsyncConnectionFactory(short version, Executor threadPool, ScheduledExecutorService scheduler)
{
this(version, scheduler, null);
this(version, threadPool, scheduler, null);
}
public ServerSPDYAsyncConnectionFactory(short version, ScheduledExecutorService scheduler, ServerSessionFrameListener listener)
public ServerSPDYAsyncConnectionFactory(short version, Executor threadPool, ScheduledExecutorService scheduler, ServerSessionFrameListener listener)
{
this.version = version;
this.threadPool = threadPool;
this.scheduler = scheduler;
this.listener = listener;
}
@ -61,7 +64,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
SPDYAsyncConnection connection = new ServerSPDYAsyncConnection(endPoint, parser, listener, connector);
endPoint.setConnection(connection);
final StandardSession session = new StandardSession(version, scheduler, connection, 2, listener, generator);
final StandardSession session = new StandardSession(version, threadPool, scheduler, connection, 2, listener, generator);
parser.addListener(session);
connection.setSession(session);

View File

@ -17,6 +17,7 @@
package org.eclipse.jetty.spdy;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.spdy.api.SPDY;
@ -25,7 +26,6 @@ import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.junit.After;
import org.junit.Rule;
import org.junit.rules.TestWatchman;
@ -77,7 +77,7 @@ public abstract class AbstractTest
return clientFactory.newSPDYClient(SPDY.V2).connect(socketAddress, listener).get();
}
protected SPDYClient.Factory newSPDYClientFactory(ThreadPool threadPool)
protected SPDYClient.Factory newSPDYClientFactory(Executor threadPool)
{
return new SPDYClient.Factory(threadPool);
}
@ -100,7 +100,6 @@ public abstract class AbstractTest
if (clientFactory != null)
{
clientFactory.stop();
clientFactory.join();
}
if (server != null)
{

View File

@ -0,0 +1,100 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.SPDYException;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.junit.Assert;
import org.junit.Test;
public class ConcurrentTest extends AbstractTest
{
@Test
public void testConcurrentSyn() throws Exception
{
final CountDownLatch slowServerLatch = new CountDownLatch(1);
final CountDownLatch fastServerLatch = new CountDownLatch(1);
Session session = startClient(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
try
{
Headers headers = synInfo.getHeaders();
String url = headers.get("url").value();
switch (url)
{
case "/slow":
Assert.assertTrue(fastServerLatch.await(10, TimeUnit.SECONDS));
slowServerLatch.countDown();
break;
case "/fast":
fastServerLatch.countDown();
break;
default:
Assert.fail();
}
stream.reply(new ReplyInfo(true));
return null;
}
catch (InterruptedException x)
{
throw new SPDYException(x);
}
}
}), null);
final CountDownLatch slowClientLatch = new CountDownLatch(1);
Headers headers1 = new Headers();
headers1.put("url", "/slow");
session.syn(new SynInfo(headers1, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
slowClientLatch.countDown();
}
});
final CountDownLatch fastClientLatch = new CountDownLatch(1);
Headers headers2 = new Headers();
headers2.put("url", "/fast");
session.syn(new SynInfo(headers2, true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
fastClientLatch.countDown();
}
});
Assert.assertTrue(fastServerLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(fastClientLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(slowServerLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(slowClientLatch.await(5, TimeUnit.SECONDS));
}
}

View File

@ -16,10 +16,11 @@
package org.eclipse.jetty.spdy;
import java.util.concurrent.Executor;
import org.eclipse.jetty.npn.NextProtoNego;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.junit.Before;
public class SSLSynReplyTest extends SynReplyTest
@ -32,7 +33,7 @@ public class SSLSynReplyTest extends SynReplyTest
}
@Override
protected SPDYClient.Factory newSPDYClientFactory(ThreadPool threadPool)
protected SPDYClient.Factory newSPDYClientFactory(Executor threadPool)
{
SslContextFactory sslContextFactory = newSslContextFactory();
return new SPDYClient.Factory(threadPool, sslContextFactory);

View File

@ -39,14 +39,14 @@ public class SettingsTest extends AbstractTest
settings.put(new Settings.Setting(Settings.ID.INITIAL_WINDOW_SIZE, windowValue));
Settings.Setting setting1 = settings.get(Settings.ID.MAX_CONCURRENT_STREAMS);
Assert.assertSame(Settings.ID.MAX_CONCURRENT_STREAMS, setting1.getId());
Assert.assertSame(Settings.Flag.PERSIST, setting1.getFlag());
Assert.assertEquals(streamsValue, setting1.getValue());
Assert.assertSame(Settings.ID.MAX_CONCURRENT_STREAMS, setting1.id());
Assert.assertSame(Settings.Flag.PERSIST, setting1.flag());
Assert.assertEquals(streamsValue, setting1.value());
Settings.Setting setting2 = settings.get(Settings.ID.INITIAL_WINDOW_SIZE);
Assert.assertSame(Settings.ID.INITIAL_WINDOW_SIZE, setting2.getId());
Assert.assertSame(Settings.Flag.NONE, setting2.getFlag());
Assert.assertEquals(windowValue, setting2.getValue());
Assert.assertSame(Settings.ID.INITIAL_WINDOW_SIZE, setting2.id());
Assert.assertSame(Settings.Flag.NONE, setting2.flag());
Assert.assertEquals(windowValue, setting2.value());
}
@Test