Updated API to support async write timeouts.

This commit is contained in:
Simone Bordet 2012-02-28 09:57:44 +01:00
parent 0be2ae4754
commit a26ae22e3f
8 changed files with 78 additions and 39 deletions

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -108,12 +109,12 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public Future<Stream> syn(SynInfo synInfo, StreamFrameListener listener) public Future<Stream> syn(SynInfo synInfo, StreamFrameListener listener)
{ {
Promise<Stream> result = new Promise<>(); Promise<Stream> result = new Promise<>();
syn(synInfo, listener, result); syn(synInfo, listener, 0, TimeUnit.MILLISECONDS, result);
return result; return result;
} }
@Override @Override
public void syn(SynInfo synInfo, StreamFrameListener listener, final Handler<Stream> handler) public void syn(SynInfo synInfo, StreamFrameListener listener, long timeout, TimeUnit unit, final Handler<Stream> handler)
{ {
// Synchronization is necessary. // Synchronization is necessary.
// SPEC v3, 2.3.1 requires that the stream creation be monotonically crescent // SPEC v3, 2.3.1 requires that the stream creation be monotonically crescent
@ -151,12 +152,12 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public Future<Void> rst(RstInfo rstInfo) public Future<Void> rst(RstInfo rstInfo)
{ {
Promise<Void> result = new Promise<>(); Promise<Void> result = new Promise<>();
rst(rstInfo, result); rst(rstInfo, 0, TimeUnit.MILLISECONDS, result);
return result; return result;
} }
@Override @Override
public void rst(RstInfo rstInfo, Handler<Void> handler) public void rst(RstInfo rstInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{ {
try try
{ {
@ -182,12 +183,12 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public Future<Void> settings(SettingsInfo settingsInfo) public Future<Void> settings(SettingsInfo settingsInfo)
{ {
Promise<Void> result = new Promise<>(); Promise<Void> result = new Promise<>();
settings(settingsInfo, result); settings(settingsInfo, 0, TimeUnit.MILLISECONDS, result);
return result; return result;
} }
@Override @Override
public void settings(SettingsInfo settingsInfo, Handler<Void> handler) public void settings(SettingsInfo settingsInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{ {
try try
{ {
@ -204,12 +205,12 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public Future<PingInfo> ping() public Future<PingInfo> ping()
{ {
Promise<PingInfo> result = new Promise<>(); Promise<PingInfo> result = new Promise<>();
ping(result); ping(0, TimeUnit.MILLISECONDS, result);
return result; return result;
} }
@Override @Override
public void ping(final Handler<PingInfo> handler) public void ping(long timeout, TimeUnit unit, final Handler<PingInfo> handler)
{ {
int pingId = pingIds.getAndAdd(2); int pingId = pingIds.getAndAdd(2);
PingInfo pingInfo = new PingInfo(pingId); PingInfo pingInfo = new PingInfo(pingId);
@ -228,12 +229,12 @@ public class StandardSession implements ISession, Parser.Listener, Handler<Stand
public Future<Void> goAway() public Future<Void> goAway()
{ {
Promise<Void> result = new Promise<>(); Promise<Void> result = new Promise<>();
goAway(result); goAway(0, TimeUnit.MILLISECONDS, result);
return result; return result;
} }
@Override @Override
public void goAway(Handler<Void> handler) public void goAway(long timeout, TimeUnit unit, Handler<Void> handler)
{ {
if (goAwaySent.compareAndSet(false, true)) if (goAwaySent.compareAndSet(false, true))
{ {

View File

@ -20,6 +20,7 @@ import java.nio.ByteBuffer;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo; import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
@ -267,12 +268,12 @@ public class StandardStream implements IStream
public Future<Void> reply(ReplyInfo replyInfo) public Future<Void> reply(ReplyInfo replyInfo)
{ {
Promise<Void> result = new Promise<>(); Promise<Void> result = new Promise<>();
reply(replyInfo, result); reply(replyInfo, 0, TimeUnit.MILLISECONDS, result);
return result; return result;
} }
@Override @Override
public void reply(ReplyInfo replyInfo, Handler<Void> handler) public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{ {
try try
{ {
@ -293,12 +294,12 @@ public class StandardStream implements IStream
public Future<Void> data(DataInfo dataInfo) public Future<Void> data(DataInfo dataInfo)
{ {
Promise<Void> result = new Promise<>(); Promise<Void> result = new Promise<>();
data(dataInfo, result); data(dataInfo, 0, TimeUnit.MILLISECONDS, result);
return result; return result;
} }
@Override @Override
public void data(DataInfo dataInfo, Handler<Void> handler) public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{ {
// Cannot update the close state here, because the data that we send may // Cannot update the close state here, because the data that we send may
// be flow controlled, so we need the stream to update the window size. // be flow controlled, so we need the stream to update the window size.
@ -309,12 +310,12 @@ public class StandardStream implements IStream
public Future<Void> headers(HeadersInfo headersInfo) public Future<Void> headers(HeadersInfo headersInfo)
{ {
Promise<Void> result = new Promise<>(); Promise<Void> result = new Promise<>();
headers(headersInfo, result); headers(headersInfo, 0, TimeUnit.MILLISECONDS, result);
return result; return result;
} }
@Override @Override
public void headers(HeadersInfo headersInfo, Handler<Void> handler) public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
{ {
try try
{ {

View File

@ -19,6 +19,7 @@ package org.eclipse.jetty.spdy.api;
import java.util.EventListener; import java.util.EventListener;
import java.util.List; import java.util.List;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/** /**
* <p>A {@link Session} represents the client-side endpoint of a SPDY connection to a single origin server.</p> * <p>A {@link Session} represents the client-side endpoint of a SPDY connection to a single origin server.</p>
@ -71,7 +72,7 @@ public interface Session
* @param synInfo the metadata to send on stream creation * @param synInfo the metadata to send on stream creation
* @param listener the listener to invoke when events happen on the stream just created * @param listener the listener to invoke when events happen on the stream just created
* @return a future for the stream that will be created * @return a future for the stream that will be created
* @see #syn(SynInfo, StreamFrameListener, Handler) * @see #syn(SynInfo, StreamFrameListener, long, TimeUnit, Handler)
*/ */
public Future<Stream> syn(SynInfo synInfo, StreamFrameListener listener); public Future<Stream> syn(SynInfo synInfo, StreamFrameListener listener);
@ -82,10 +83,12 @@ public interface Session
* *
* @param synInfo the metadata to send on stream creation * @param synInfo the metadata to send on stream creation
* @param listener the listener to invoke when events happen on the stream just created * @param listener the listener to invoke when events happen on the stream just created
* @param timeout the operation's timeout
* @param unit the timeout's unit
* @param handler the completion handler that gets notified of stream creation * @param handler the completion handler that gets notified of stream creation
* @see #syn(SynInfo, StreamFrameListener) * @see #syn(SynInfo, StreamFrameListener)
*/ */
public void syn(SynInfo synInfo, StreamFrameListener listener, Handler<Stream> handler); public void syn(SynInfo synInfo, StreamFrameListener listener, long timeout, TimeUnit unit, Handler<Stream> handler);
/** /**
* <p>Sends asynchronously a RST_STREAM to abort a stream.</p> * <p>Sends asynchronously a RST_STREAM to abort a stream.</p>
@ -93,6 +96,7 @@ public interface Session
* *
* @param rstInfo the metadata to reset the stream * @param rstInfo the metadata to reset the stream
* @return a future to wait for the reset to be sent * @return a future to wait for the reset to be sent
* @see #rst(RstInfo, long, TimeUnit, Handler)
*/ */
public Future<Void> rst(RstInfo rstInfo); public Future<Void> rst(RstInfo rstInfo);
@ -102,9 +106,12 @@ public interface Session
* reset has been actually sent.</p> * reset has been actually sent.</p>
* *
* @param rstInfo the metadata to reset the stream * @param rstInfo the metadata to reset the stream
* @param timeout the operation's timeout
* @param unit the timeout's unit
* @param handler the completion handler that gets notified of reset's send * @param handler the completion handler that gets notified of reset's send
* @see #rst(RstInfo)
*/ */
public void rst(RstInfo rstInfo, Handler<Void> handler); public void rst(RstInfo rstInfo, long timeout, TimeUnit unit, Handler<Void> handler);
/** /**
* <p>Sends asynchronously a SETTINGS to configure the SPDY connection.</p> * <p>Sends asynchronously a SETTINGS to configure the SPDY connection.</p>
@ -112,6 +119,7 @@ public interface Session
* *
* @param settingsInfo the metadata to send * @param settingsInfo the metadata to send
* @return a future to wait for the settings to be sent * @return a future to wait for the settings to be sent
* @see #settings(SettingsInfo, long, TimeUnit, Handler)
*/ */
public Future<Void> settings(SettingsInfo settingsInfo); public Future<Void> settings(SettingsInfo settingsInfo);
@ -121,15 +129,19 @@ public interface Session
* settings has been actually sent.</p> * settings has been actually sent.</p>
* *
* @param settingsInfo the metadata to send * @param settingsInfo the metadata to send
* @param timeout the operation's timeout
* @param unit the timeout's unit
* @param handler the completion handler that gets notified of settings' send * @param handler the completion handler that gets notified of settings' send
* @see #settings(SettingsInfo)
*/ */
public void settings(SettingsInfo settingsInfo, Handler<Void> handler); public void settings(SettingsInfo settingsInfo, long timeout, TimeUnit unit, Handler<Void> handler);
/** /**
* <p>Sends asynchronously a PING, normally to measure round-trip time.</p> * <p>Sends asynchronously a PING, normally to measure round-trip time.</p>
* <p>Callers may use the returned future to wait for the ping to be sent.</p> * <p>Callers may use the returned future to wait for the ping to be sent.</p>
* *
* @return a future for the metadata sent * @return a future for the metadata sent
* @see #ping(long, TimeUnit, Handler)
*/ */
public Future<PingInfo> ping(); public Future<PingInfo> ping();
@ -138,15 +150,19 @@ public interface Session
* <p>Callers may pass a non-null completion handler to be notified of when the * <p>Callers may pass a non-null completion handler to be notified of when the
* ping has been actually sent.</p> * ping has been actually sent.</p>
* *
* @param timeout the operation's timeout
* @param unit the timeout's unit
* @param handler the completion handler that gets notified of ping's send * @param handler the completion handler that gets notified of ping's send
* @see #ping()
*/ */
public void ping(Handler<PingInfo> handler); public void ping(long timeout, TimeUnit unit, Handler<PingInfo> handler);
/** /**
* <p>Closes gracefully this session, sending a GO_AWAY frame and then closing the TCP connection.</p> * <p>Closes gracefully this session, sending a GO_AWAY frame and then closing the TCP connection.</p>
* <p>Callers may use the returned future to wait for the go away to be sent.</p> * <p>Callers may use the returned future to wait for the go away to be sent.</p>
* *
* @return a future to wait for the go away to be sent * @return a future to wait for the go away to be sent
* @see #goAway(long, TimeUnit, Handler)
*/ */
public Future<Void> goAway(); public Future<Void> goAway();
@ -155,9 +171,12 @@ public interface Session
* <p>Callers may pass a non-null completion handler to be notified of when the * <p>Callers may pass a non-null completion handler to be notified of when the
* go away has been actually sent.</p> * go away has been actually sent.</p>
* *
* @param timeout the operation's timeout
* @param unit the timeout's unit
* @param handler the completion handler that gets notified of go away's send * @param handler the completion handler that gets notified of go away's send
* @see #goAway()
*/ */
public void goAway(Handler<Void> handler); public void goAway(long timeout, TimeUnit unit, Handler<Void> handler);
/** /**
* <p>Initiates the flush of data to the other peer.</p> * <p>Initiates the flush of data to the other peer.</p>

View File

@ -17,6 +17,7 @@
package org.eclipse.jetty.spdy.api; package org.eclipse.jetty.spdy.api;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/** /**
* <p>A {@link Stream} represents an bidirectional exchange of data on top of a {@link Session}.</p> * <p>A {@link Stream} represents an bidirectional exchange of data on top of a {@link Session}.</p>
@ -57,6 +58,7 @@ public interface Stream
* *
* @param replyInfo the metadata to send * @param replyInfo the metadata to send
* @return a future to wait for the reply to be sent * @return a future to wait for the reply to be sent
* @see #reply(ReplyInfo, long, TimeUnit, Handler)
* @see SessionFrameListener#onSyn(Stream, SynInfo) * @see SessionFrameListener#onSyn(Stream, SynInfo)
*/ */
public Future<Void> reply(ReplyInfo replyInfo); public Future<Void> reply(ReplyInfo replyInfo);
@ -67,9 +69,12 @@ public interface Stream
* reply has been actually sent.</p> * reply has been actually sent.</p>
* *
* @param replyInfo the metadata to send * @param replyInfo the metadata to send
* @param timeout the operation's timeout
* @param unit the timeout's unit
* @param handler the completion handler that gets notified of reply sent * @param handler the completion handler that gets notified of reply sent
* @see #reply(ReplyInfo)
*/ */
public void reply(ReplyInfo replyInfo, Handler<Void> handler); public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler<Void> handler);
/** /**
* <p>Sends asynchronously a DATA frame on this stream.</p> * <p>Sends asynchronously a DATA frame on this stream.</p>
@ -78,6 +83,7 @@ public interface Stream
* *
* @param dataInfo the metadata to send * @param dataInfo the metadata to send
* @return a future to wait for the data to be sent * @return a future to wait for the data to be sent
* @see #data(DataInfo, long, TimeUnit, Handler)
* @see #reply(ReplyInfo) * @see #reply(ReplyInfo)
*/ */
public Future<Void> data(DataInfo dataInfo); public Future<Void> data(DataInfo dataInfo);
@ -89,9 +95,12 @@ public interface Stream
* data has been actually sent.</p> * data has been actually sent.</p>
* *
* @param dataInfo the metadata to send * @param dataInfo the metadata to send
* @param timeout the operation's timeout
* @param unit the timeout's unit
* @param handler the completion handler that gets notified of data sent * @param handler the completion handler that gets notified of data sent
* @see #data(DataInfo)
*/ */
public void data(DataInfo dataInfo, Handler<Void> handler); public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler);
/** /**
* <p>Sends asynchronously a HEADER frame on this stream.</p> * <p>Sends asynchronously a HEADER frame on this stream.</p>
@ -100,6 +109,7 @@ public interface Stream
* *
* @param headersInfo the metadata to send * @param headersInfo the metadata to send
* @return a future to wait for the headers to be sent * @return a future to wait for the headers to be sent
* @see #headers(HeadersInfo, long, TimeUnit, Handler)
* @see #reply(ReplyInfo) * @see #reply(ReplyInfo)
*/ */
public Future<Void> headers(HeadersInfo headersInfo); public Future<Void> headers(HeadersInfo headersInfo);
@ -111,9 +121,12 @@ public interface Stream
* headers have been actually sent.</p> * headers have been actually sent.</p>
* *
* @param headersInfo the metadata to send * @param headersInfo the metadata to send
* @param timeout the operation's timeout
* @param unit the timeout's unit
* @param handler the completion handler that gets notified of headers sent * @param handler the completion handler that gets notified of headers sent
* @see #headers(HeadersInfo)
*/ */
public void headers(HeadersInfo headersInfo, Handler<Void> handler); public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler);
/** /**
* @return whether this stream has been closed by both parties * @return whether this stream has been closed by both parties
@ -123,7 +136,9 @@ public interface Stream
/** /**
* @return whether this stream has been closed by one party only * @return whether this stream has been closed by one party only
* @see #isClosed() * @see #isClosed() * @param timeout the timeout for the stream creation
* @param unit the timeout's unit
*/ */
public boolean isHalfClosed(); public boolean isHalfClosed();

View File

@ -17,6 +17,7 @@
package org.eclipse.jetty.spdy.api; package org.eclipse.jetty.spdy.api;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.StandardSession; import org.eclipse.jetty.spdy.StandardSession;
import org.junit.Ignore; import org.junit.Ignore;
@ -82,7 +83,7 @@ public class ClientUsageTest
// Then issue another similar request // Then issue another similar request
stream.getSession().syn(new SynInfo(true), this); stream.getSession().syn(new SynInfo(true), this);
} }
}, new Handler.Adapter<Stream>() }, 0, TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>()
{ {
@Override @Override
public void completed(Stream stream) public void completed(Stream stream)
@ -137,7 +138,7 @@ public class ClientUsageTest
} }
} }
}, new Handler.Adapter<Stream>() }, 0, TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>()
{ {
@Override @Override
public void completed(Stream stream) public void completed(Stream stream)

View File

@ -16,6 +16,8 @@
package org.eclipse.jetty.spdy.api; package org.eclipse.jetty.spdy.api;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@ -71,7 +73,7 @@ public class ServerUsageTest
// //
// However, the API may allow to initiate the stream // However, the API may allow to initiate the stream
session.syn(new SynInfo(false), null, new Handler.Adapter<Stream>() session.syn(new SynInfo(false), null, 0, TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>()
{ {
@Override @Override
public void completed(Stream stream) public void completed(Stream stream)
@ -101,7 +103,7 @@ public class ServerUsageTest
Session session = stream.getSession(); Session session = stream.getSession();
// Since it's unidirectional, no need to pass the listener // Since it's unidirectional, no need to pass the listener
session.syn(new SynInfo(new Headers(), false, true, stream.getId(), (byte)0), null, new Handler.Adapter<Stream>() session.syn(new SynInfo(new Headers(), false, true, stream.getId(), (byte)0), null, 0, TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>()
{ {
@Override @Override
public void completed(Stream pushStream) public void completed(Stream pushStream)

View File

@ -65,7 +65,7 @@ public class PingTest extends AbstractTest
@Override @Override
public void onConnect(Session session) public void onConnect(Session session)
{ {
session.ping(new Handler.Adapter<PingInfo>() session.ping(0, TimeUnit.MILLISECONDS, new Handler.Adapter<PingInfo>()
{ {
@Override @Override
public void completed(PingInfo pingInfo) public void completed(PingInfo pingInfo)

View File

@ -280,7 +280,7 @@ public class SynReplyTest extends AbstractTest
Assert.assertEquals(clientData, data); Assert.assertEquals(clientData, data);
clientDataLatch.countDown(); clientDataLatch.countDown();
} }
}, new Handler.Adapter<Stream>() }, 0, TimeUnit.MILLISECONDS, new Handler.Adapter<Stream>()
{ {
@Override @Override
public void completed(Stream stream) public void completed(Stream stream)