Implemented support for async write timeouts.

This commit is contained in:
Simone Bordet 2012-02-28 11:19:33 +01:00
parent a26ae22e3f
commit 0dbcaff911
11 changed files with 253 additions and 42 deletions

View File

@ -17,6 +17,7 @@
package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler;
@ -25,9 +26,9 @@ import org.eclipse.jetty.spdy.frames.ControlFrame;
public interface ISession extends Session
{
public <C> void control(IStream stream, ControlFrame frame, Handler<C> handler, C context) throws StreamException;
public <C> void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Handler<C> handler, C context) throws StreamException;
public <C> void data(IStream stream, DataInfo dataInfo, Handler<C> handler, C context);
public <C> void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Handler<C> handler, C context);
public int getWindowSize();

View File

@ -17,6 +17,7 @@
package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import java.nio.channels.InterruptedByTimeoutException;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
@ -25,6 +26,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -66,6 +69,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
private final Deque<FrameBytes> queue = new LinkedList<>();
private final ScheduledExecutorService scheduler;
private final short version;
private final Controller<FrameBytes> controller;
private final AtomicInteger streamIds;
@ -78,8 +82,9 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
private boolean flushing;
private volatile int windowSize = 65536;
public StandardSession(short version, Controller<FrameBytes> controller, int initialStreamId, SessionFrameListener listener, Generator generator)
public StandardSession(ScheduledExecutorService scheduler, short version, Controller<FrameBytes> controller, int initialStreamId, SessionFrameListener listener, Generator generator)
{
this.scheduler = scheduler;
this.version = version;
this.controller = controller;
this.streamIds = new AtomicInteger(initialStreamId);
@ -137,7 +142,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
try
{
// May throw if wrong version or headers too big
control(stream, synStream, handler, stream);
control(stream, synStream, timeout, unit, handler, stream);
}
catch (StreamException x)
{
@ -169,7 +174,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
else
{
RstStreamFrame frame = new RstStreamFrame(version, rstInfo.getStreamId(), rstInfo.getStreamStatus().getCode(version));
control(null, frame, handler, null);
control(null, frame, timeout, unit, handler, null);
}
}
catch (StreamException x)
@ -193,7 +198,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
try
{
SettingsFrame frame = new SettingsFrame(version, settingsInfo.getFlags(), settingsInfo.getSettings());
control(null, frame, handler, null);
control(null, frame, timeout, unit, handler, null);
}
catch (StreamException x)
{
@ -217,7 +222,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
try
{
PingFrame frame = new PingFrame(version, pingId);
control(null, frame, handler, pingInfo);
control(null, frame, timeout, unit, handler, pingInfo);
}
catch (StreamException x)
{
@ -243,7 +248,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
try
{
GoAwayFrame frame = new GoAwayFrame(version, lastStreamId.get(), SessionStatus.OK.getCode());
control(null, frame, handler, null);
control(null, frame, timeout, unit, handler, null);
return;
}
catch (StreamException x)
@ -368,10 +373,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public void onSessionException(SessionException x)
{
// TODO: must send a GOAWAY with the x.sessionStatus, then close
// Check for null to support tests
if (controller != null)
controller.close(true);
close();
}
private void onSyn(SynStreamFrame synStream)
@ -506,7 +508,7 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
else
{
control(null, frame, new Promise<>(), null);
control(null, frame, 0, TimeUnit.MILLISECONDS, new Promise<>(), null);
}
}
catch (StreamException x)
@ -525,10 +527,17 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
// SPDY does not require to send back a response to a GO_AWAY.
// We notified the application of the last good stream id,
// tried our best to flush remaining data, and close.
controller.close(false);
close();
}
}
protected void close()
{
// Check for null to support tests
if (controller != null)
controller.close(false);
}
private void onHeaders(HeadersFrame frame)
{
int streamId = frame.getStreamId();
@ -635,13 +644,16 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
@Override
public <C> void control(IStream stream, ControlFrame frame, Handler<C> handler, C context) throws StreamException
public <C> void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Handler<C> handler, C context) throws StreamException
{
if (stream != null)
updateLastStreamId(stream);
ByteBuffer buffer = generator.control(frame);
logger.debug("Posting {}", frame);
enqueueLast(new ControlFrameBytes<>(frame, buffer, handler, context));
logger.debug("Queuing {} on {}", frame, stream);
ControlFrameBytes<C> frameBytes = new ControlFrameBytes<>(frame, buffer, handler, context);
if (timeout > 0)
frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
enqueueLast(frameBytes);
flush();
}
@ -662,10 +674,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
@Override
public <C> void data(IStream stream, DataInfo dataInfo, Handler<C> handler, C context)
public <C> void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Handler<C> handler, C context)
{
logger.debug("Posting {} on {}", dataInfo, stream);
enqueueLast(new DataFrameBytes<>(stream, dataInfo, handler, context));
logger.debug("Queuing {} on {}", dataInfo, stream);
DataFrameBytes<C> frameBytes = new DataFrameBytes<>(stream, dataInfo, handler, context);
if (timeout > 0)
frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
enqueueLast(frameBytes);
flush();
}
@ -742,7 +757,8 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
protected void write(final ByteBuffer buffer, Handler<FrameBytes> handler, FrameBytes frameBytes)
{
controller.write(buffer, handler, frameBytes);
if (controller != null)
controller.write(buffer, handler, frameBytes);
}
public interface FrameBytes
@ -752,12 +768,13 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public abstract void complete();
}
private class ControlFrameBytes<C> implements FrameBytes
private class ControlFrameBytes<C> implements FrameBytes, Runnable
{
private final ControlFrame frame;
private final ByteBuffer buffer;
private final Handler<C> handler;
private final C context;
private volatile ScheduledFuture<?> task;
private ControlFrameBytes(ControlFrame frame, ByteBuffer buffer, Handler<C> handler, C context)
{
@ -776,15 +793,27 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
@Override
public void complete()
{
ScheduledFuture<?> task = this.task;
if (task != null)
task.cancel(false);
if (frame.getType() == ControlFrameType.GO_AWAY)
{
// After sending a GO_AWAY we need to hard close the connection.
// Recipients will know the last good stream id and act accordingly.
controller.close(false);
close();
}
handler.completed(context);
}
@Override
public void run()
{
close();
handler.failed(new InterruptedByTimeoutException());
}
@Override
public String toString()
{
@ -792,13 +821,14 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
}
private class DataFrameBytes<C> implements FrameBytes
private class DataFrameBytes<C> implements FrameBytes, Runnable
{
private final IStream stream;
private final DataInfo data;
private final Handler<C> handler;
private final C context;
private int dataLength;
private volatile ScheduledFuture<?> task;
private DataFrameBytes(IStream stream, DataInfo data, Handler<C> handler, C context)
{
@ -834,13 +864,25 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
}
else
{
ScheduledFuture<?> task = this.task;
if (task != null)
task.cancel(false);
stream.updateCloseState(data.isClose());
if (stream.isClosed())
removeStream(stream);
handler.completed(context);
}
}
@Override
public void run()
{
close();
handler.failed(new InterruptedByTimeoutException());
}
@Override
public String toString()
{

View File

@ -203,7 +203,7 @@ public class StandardStream implements IStream
// we will send many window update frames... perhaps we can delay
// window update frames until we have a bigger delta to send
WindowUpdateFrame windowUpdateFrame = new WindowUpdateFrame(session.getVersion(), getId(), delta);
session.control(this, windowUpdateFrame, new Promise<>(), null);
session.control(this, windowUpdateFrame, 0, TimeUnit.MILLISECONDS, new Promise<>(), null);
}
}
catch (StreamException x)
@ -279,7 +279,7 @@ public class StandardStream implements IStream
{
updateCloseState(replyInfo.isClose());
SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
session.control(this, frame, handler, null);
session.control(this, frame, timeout, unit, handler, null);
session.flush();
}
catch (StreamException x)
@ -303,7 +303,7 @@ public class StandardStream implements IStream
{
// Cannot update the close state here, because the data that we send may
// be flow controlled, so we need the stream to update the window size.
session.data(this, dataInfo, handler, null);
session.data(this, dataInfo, timeout, unit, handler, null);
}
@Override
@ -321,7 +321,7 @@ public class StandardStream implements IStream
{
updateCloseState(headersInfo.isClose());
HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
session.control(this, frame, handler, null);
session.control(this, frame, timeout, unit, handler, null);
}
catch (StreamException x)
{

View File

@ -0,0 +1,144 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.SPDYException;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.generator.Generator;
import org.junit.Assert;
import org.junit.Test;
public class AsyncTimeoutTest
{
@Test
public void testAsyncTimeoutInControlFrames() throws Exception
{
final long timeout = 1000;
final TimeUnit unit = TimeUnit.MILLISECONDS;
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
Generator generator = new Generator(new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(scheduler, SPDY.V2, new TestController(), 1, null, generator)
{
@Override
public void flush()
{
try
{
unit.sleep(2 * timeout);
super.flush();
}
catch (InterruptedException x)
{
throw new SPDYException(x);
}
}
};
final CountDownLatch failedLatch = new CountDownLatch(1);
session.syn(new SynInfo(true), null, timeout, unit, new Handler<Stream>()
{
@Override
public void completed(Stream stream)
{
}
@Override
public void failed(Throwable x)
{
failedLatch.countDown();
}
});
Assert.assertTrue(failedLatch.await(2 * timeout, unit));
}
@Test
public void testAsyncTimeoutInDataFrames() throws Exception
{
final long timeout = 1000;
final TimeUnit unit = TimeUnit.MILLISECONDS;
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
Generator generator = new Generator(new StandardCompressionFactory.StandardCompressor());
Session session = new StandardSession(scheduler, SPDY.V2, new TestController(), 1, null, generator)
{
private final AtomicInteger flushes = new AtomicInteger();
@Override
public void flush()
{
try
{
int flushes = this.flushes.incrementAndGet();
if (flushes == 3)
unit.sleep(2 * timeout);
super.flush();
}
catch (InterruptedException x)
{
throw new SPDYException(x);
}
}
};
Stream stream = session.syn(new SynInfo(false), null).get(5, TimeUnit.SECONDS);
final CountDownLatch failedLatch = new CountDownLatch(1);
stream.data(new StringDataInfo("data", true), timeout, unit, new Handler<Void>()
{
@Override
public void completed(Void context)
{
}
@Override
public void failed(Throwable x)
{
failedLatch.countDown();
}
});
Assert.assertTrue(failedLatch.await(2 * timeout, unit));
}
private static class TestController implements ISession.Controller<StandardSession.FrameBytes>
{
@Override
public int write(ByteBuffer buffer, Handler<StandardSession.FrameBytes> handler, StandardSession.FrameBytes context)
{
handler.completed(context);
return buffer.remaining();
}
@Override
public void close(boolean onlyOutput)
{
}
}
}

View File

@ -29,7 +29,7 @@ public class ClientUsageTest
@Test
public void testClientRequestResponseNoBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, 1, null, null);
Session session = new StandardSession(null, SPDY.V2, null, 1, null, null);
session.syn(new SynInfo(true), new StreamFrameListener.Adapter()
{
@ -48,7 +48,7 @@ public class ClientUsageTest
@Test
public void testClientRequestWithBodyResponseNoBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, 1, null, null);
Session session = new StandardSession(null, SPDY.V2, null, 1, null, null);
Stream stream = session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
{
@ -69,7 +69,7 @@ public class ClientUsageTest
@Test
public void testAsyncClientRequestWithBodyResponseNoBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, 1, null, null);
Session session = new StandardSession(null, SPDY.V2, null, 1, null, null);
final String context = "context";
session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
@ -104,7 +104,7 @@ public class ClientUsageTest
@Test
public void testAsyncClientRequestWithBodyAndResponseWithBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, 1, null, null);
Session session = new StandardSession(null, SPDY.V2, null, 1, null, null);
session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
{

View File

@ -34,7 +34,7 @@ public class HTTPSPDYServerConnector extends SPDYServerConnector
{
super(null, sslContextFactory);
// Override the "spdy/2" protocol by handling HTTP over SPDY
putAsyncConnectionFactory("spdy/2", new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, this));
putAsyncConnectionFactory("spdy/2", new ServerHTTPSPDYAsyncConnectionFactory(getScheduler(), SPDY.V2, this));
// Add the "http/1.1" protocol for browsers that do not support NPN
putAsyncConnectionFactory("http/1.1", new ServerHTTPAsyncConnectionFactory(this));
// Override the default connection factory for non-SSL connections

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy.http;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.http.HttpException;
import org.eclipse.jetty.io.AsyncEndPoint;
@ -41,9 +42,9 @@ public class ServerHTTPSPDYAsyncConnectionFactory extends ServerSPDYAsyncConnect
private static final Logger logger = LoggerFactory.getLogger(ServerHTTPSPDYAsyncConnectionFactory.class);
private final Connector connector;
public ServerHTTPSPDYAsyncConnectionFactory(short version, Connector connector)
public ServerHTTPSPDYAsyncConnectionFactory(ScheduledExecutorService scheduler, short version, Connector connector)
{
super(version);
super(scheduler, version);
this.connector = connector;
}

View File

@ -71,7 +71,7 @@ public abstract class AbstractHTTPSPDYTest
@Override
protected AsyncConnectionFactory getDefaultAsyncConnectionFactory()
{
return new ServerHTTPSPDYAsyncConnectionFactory(SPDY.V2, this);
return new ServerHTTPSPDYAsyncConnectionFactory(getScheduler(), SPDY.V2, this);
}
};
}

View File

@ -33,7 +33,9 @@ import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
@ -173,6 +175,7 @@ public class SPDYClient
{
private final Map<String, AsyncConnectionFactory> factories = new ConcurrentHashMap<>();
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final ThreadPool threadPool;
private final SslContextFactory sslContextFactory;
private final SelectorManager selector;
@ -415,7 +418,7 @@ public class SPDYClient
SPDYAsyncConnection connection = new ClientSPDYAsyncConnection(endPoint, parser, factory);
endPoint.setConnection(connection);
StandardSession session = new StandardSession(sessionPromise.client.version, connection, 1, sessionPromise.listener, generator);
StandardSession session = new StandardSession(factory.scheduler, sessionPromise.client.version, connection, 1, sessionPromise.listener, generator);
parser.addListener(session);
sessionPromise.completed(session);
connection.setSession(session);

View File

@ -25,6 +25,9 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
@ -43,6 +46,7 @@ public class SPDYServerConnector extends SelectChannelConnector
// Order is important on server side, so we use a LinkedHashMap
private final Map<String, AsyncConnectionFactory> factories = new LinkedHashMap<>();
private final Queue<Session> sessions = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final SslContextFactory sslContextFactory;
private final AsyncConnectionFactory defaultConnectionFactory;
@ -56,17 +60,30 @@ public class SPDYServerConnector extends SelectChannelConnector
this.sslContextFactory = sslContextFactory;
if (sslContextFactory != null)
addBean(sslContextFactory);
defaultConnectionFactory = new ServerSPDYAsyncConnectionFactory(SPDY.V2, listener);
defaultConnectionFactory = new ServerSPDYAsyncConnectionFactory(scheduler, SPDY.V2, listener);
putAsyncConnectionFactory("spdy/2", defaultConnectionFactory);
}
protected ScheduledExecutorService getScheduler()
{
return scheduler;
}
@Override
protected void doStop() throws Exception
{
closeSessions();
scheduler.shutdown();
super.doStop();
}
@Override
public void join() throws InterruptedException
{
scheduler.awaitTermination(0, TimeUnit.MILLISECONDS);
super.join();
}
public AsyncConnectionFactory getAsyncConnectionFactory(String protocol)
{
synchronized (factories)

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy;
import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Connection;
@ -28,16 +29,18 @@ import org.eclipse.jetty.spdy.parser.Parser;
public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
{
private final ScheduledExecutorService scheduler;
private final short version;
private final ServerSessionFrameListener listener;
public ServerSPDYAsyncConnectionFactory(short version)
public ServerSPDYAsyncConnectionFactory(ScheduledExecutorService scheduler, short version)
{
this(version, null);
this(scheduler, version, null);
}
public ServerSPDYAsyncConnectionFactory(short version, ServerSessionFrameListener listener)
public ServerSPDYAsyncConnectionFactory(ScheduledExecutorService scheduler, short version, ServerSessionFrameListener listener)
{
this.scheduler = scheduler;
this.version = version;
this.listener = listener;
}
@ -58,7 +61,7 @@ public class ServerSPDYAsyncConnectionFactory implements AsyncConnectionFactory
SPDYAsyncConnection connection = new ServerSPDYAsyncConnection(endPoint, parser, listener, connector);
endPoint.setConnection(connection);
final StandardSession session = new StandardSession(version, connection, 2, listener, generator);
final StandardSession session = new StandardSession(scheduler, version, connection, 2, listener, generator);
parser.addListener(session);
connection.setSession(session);