From a26ae22e3fbaca958d165a63116edad741bcc409 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 28 Feb 2012 09:57:44 +0100 Subject: [PATCH] Updated API to support async write timeouts. --- .../eclipse/jetty/spdy/StandardSession.java | 21 +++++----- .../eclipse/jetty/spdy/StandardStream.java | 13 ++++--- .../org/eclipse/jetty/spdy/api/Session.java | 39 ++++++++++++++----- .../org/eclipse/jetty/spdy/api/Stream.java | 29 ++++++++++---- .../jetty/spdy/api/ClientUsageTest.java | 5 ++- .../jetty/spdy/api/ServerUsageTest.java | 6 ++- .../java/org/eclipse/jetty/spdy/PingTest.java | 2 +- .../org/eclipse/jetty/spdy/SynReplyTest.java | 2 +- 8 files changed, 78 insertions(+), 39 deletions(-) diff --git a/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java b/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java index e89451b66b3..649fc44b410 100644 --- a/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java +++ b/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardSession.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -108,12 +109,12 @@ public class StandardSession implements ISession, Parser.Listener, Handler syn(SynInfo synInfo, StreamFrameListener listener) { Promise result = new Promise<>(); - syn(synInfo, listener, result); + syn(synInfo, listener, 0, TimeUnit.MILLISECONDS, result); return result; } @Override - public void syn(SynInfo synInfo, StreamFrameListener listener, final Handler handler) + public void syn(SynInfo synInfo, StreamFrameListener listener, long timeout, TimeUnit unit, final Handler handler) { // Synchronization is necessary. // SPEC v3, 2.3.1 requires that the stream creation be monotonically crescent @@ -151,12 +152,12 @@ public class StandardSession implements ISession, Parser.Listener, Handler rst(RstInfo rstInfo) { Promise result = new Promise<>(); - rst(rstInfo, result); + rst(rstInfo, 0, TimeUnit.MILLISECONDS, result); return result; } @Override - public void rst(RstInfo rstInfo, Handler handler) + public void rst(RstInfo rstInfo, long timeout, TimeUnit unit, Handler handler) { try { @@ -182,12 +183,12 @@ public class StandardSession implements ISession, Parser.Listener, Handler settings(SettingsInfo settingsInfo) { Promise result = new Promise<>(); - settings(settingsInfo, result); + settings(settingsInfo, 0, TimeUnit.MILLISECONDS, result); return result; } @Override - public void settings(SettingsInfo settingsInfo, Handler handler) + public void settings(SettingsInfo settingsInfo, long timeout, TimeUnit unit, Handler handler) { try { @@ -204,12 +205,12 @@ public class StandardSession implements ISession, Parser.Listener, Handler ping() { Promise result = new Promise<>(); - ping(result); + ping(0, TimeUnit.MILLISECONDS, result); return result; } @Override - public void ping(final Handler handler) + public void ping(long timeout, TimeUnit unit, final Handler handler) { int pingId = pingIds.getAndAdd(2); PingInfo pingInfo = new PingInfo(pingId); @@ -228,12 +229,12 @@ public class StandardSession implements ISession, Parser.Listener, Handler goAway() { Promise result = new Promise<>(); - goAway(result); + goAway(0, TimeUnit.MILLISECONDS, result); return result; } @Override - public void goAway(Handler handler) + public void goAway(long timeout, TimeUnit unit, Handler handler) { if (goAwaySent.compareAndSet(false, true)) { diff --git a/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java b/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java index bd1722f0558..40304324ed6 100644 --- a/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java +++ b/spdy-core/src/main/java/org/eclipse/jetty/spdy/StandardStream.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.spdy.api.ByteBufferDataInfo; @@ -267,12 +268,12 @@ public class StandardStream implements IStream public Future reply(ReplyInfo replyInfo) { Promise result = new Promise<>(); - reply(replyInfo, result); + reply(replyInfo, 0, TimeUnit.MILLISECONDS, result); return result; } @Override - public void reply(ReplyInfo replyInfo, Handler handler) + public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler handler) { try { @@ -293,12 +294,12 @@ public class StandardStream implements IStream public Future data(DataInfo dataInfo) { Promise result = new Promise<>(); - data(dataInfo, result); + data(dataInfo, 0, TimeUnit.MILLISECONDS, result); return result; } @Override - public void data(DataInfo dataInfo, Handler handler) + public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler handler) { // Cannot update the close state here, because the data that we send may // be flow controlled, so we need the stream to update the window size. @@ -309,12 +310,12 @@ public class StandardStream implements IStream public Future headers(HeadersInfo headersInfo) { Promise result = new Promise<>(); - headers(headersInfo, result); + headers(headersInfo, 0, TimeUnit.MILLISECONDS, result); return result; } @Override - public void headers(HeadersInfo headersInfo, Handler handler) + public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler handler) { try { diff --git a/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Session.java b/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Session.java index 4c84aa42ca1..636cb206ce0 100644 --- a/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Session.java +++ b/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Session.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.spdy.api; import java.util.EventListener; import java.util.List; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; /** *

A {@link Session} represents the client-side endpoint of a SPDY connection to a single origin server.

@@ -71,7 +72,7 @@ public interface Session * @param synInfo the metadata to send on stream creation * @param listener the listener to invoke when events happen on the stream just created * @return a future for the stream that will be created - * @see #syn(SynInfo, StreamFrameListener, Handler) + * @see #syn(SynInfo, StreamFrameListener, long, TimeUnit, Handler) */ public Future syn(SynInfo synInfo, StreamFrameListener listener); @@ -82,10 +83,12 @@ public interface Session * * @param synInfo the metadata to send on stream creation * @param listener the listener to invoke when events happen on the stream just created + * @param timeout the operation's timeout + * @param unit the timeout's unit * @param handler the completion handler that gets notified of stream creation * @see #syn(SynInfo, StreamFrameListener) */ - public void syn(SynInfo synInfo, StreamFrameListener listener, Handler handler); + public void syn(SynInfo synInfo, StreamFrameListener listener, long timeout, TimeUnit unit, Handler handler); /** *

Sends asynchronously a RST_STREAM to abort a stream.

@@ -93,6 +96,7 @@ public interface Session * * @param rstInfo the metadata to reset the stream * @return a future to wait for the reset to be sent + * @see #rst(RstInfo, long, TimeUnit, Handler) */ public Future rst(RstInfo rstInfo); @@ -102,9 +106,12 @@ public interface Session * reset has been actually sent.

* * @param rstInfo the metadata to reset the stream - * @param handler the completion handler that gets notified of reset's send + * @param timeout the operation's timeout + * @param unit the timeout's unit + * @param handler the completion handler that gets notified of reset's send + * @see #rst(RstInfo) */ - public void rst(RstInfo rstInfo, Handler handler); + public void rst(RstInfo rstInfo, long timeout, TimeUnit unit, Handler handler); /** *

Sends asynchronously a SETTINGS to configure the SPDY connection.

@@ -112,6 +119,7 @@ public interface Session * * @param settingsInfo the metadata to send * @return a future to wait for the settings to be sent + * @see #settings(SettingsInfo, long, TimeUnit, Handler) */ public Future settings(SettingsInfo settingsInfo); @@ -121,15 +129,19 @@ public interface Session * settings has been actually sent.

* * @param settingsInfo the metadata to send - * @param handler the completion handler that gets notified of settings' send + * @param timeout the operation's timeout + * @param unit the timeout's unit + * @param handler the completion handler that gets notified of settings' send + * @see #settings(SettingsInfo) */ - public void settings(SettingsInfo settingsInfo, Handler handler); + public void settings(SettingsInfo settingsInfo, long timeout, TimeUnit unit, Handler handler); /** *

Sends asynchronously a PING, normally to measure round-trip time.

*

Callers may use the returned future to wait for the ping to be sent.

* * @return a future for the metadata sent + * @see #ping(long, TimeUnit, Handler) */ public Future ping(); @@ -138,15 +150,19 @@ public interface Session *

Callers may pass a non-null completion handler to be notified of when the * ping has been actually sent.

* - * @param handler the completion handler that gets notified of ping's send + * @param timeout the operation's timeout + * @param unit the timeout's unit + * @param handler the completion handler that gets notified of ping's send + * @see #ping() */ - public void ping(Handler handler); + public void ping(long timeout, TimeUnit unit, Handler handler); /** *

Closes gracefully this session, sending a GO_AWAY frame and then closing the TCP connection.

*

Callers may use the returned future to wait for the go away to be sent.

* * @return a future to wait for the go away to be sent + * @see #goAway(long, TimeUnit, Handler) */ public Future goAway(); @@ -155,9 +171,12 @@ public interface Session *

Callers may pass a non-null completion handler to be notified of when the * go away has been actually sent.

* - * @param handler the completion handler that gets notified of go away's send + * @param timeout the operation's timeout + * @param unit the timeout's unit + * @param handler the completion handler that gets notified of go away's send + * @see #goAway() */ - public void goAway(Handler handler); + public void goAway(long timeout, TimeUnit unit, Handler handler); /** *

Initiates the flush of data to the other peer.

diff --git a/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java b/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java index f12ed31dda1..2f7acfe5df2 100644 --- a/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java +++ b/spdy-core/src/main/java/org/eclipse/jetty/spdy/api/Stream.java @@ -17,6 +17,7 @@ package org.eclipse.jetty.spdy.api; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; /** *

A {@link Stream} represents an bidirectional exchange of data on top of a {@link Session}.

@@ -57,6 +58,7 @@ public interface Stream * * @param replyInfo the metadata to send * @return a future to wait for the reply to be sent + * @see #reply(ReplyInfo, long, TimeUnit, Handler) * @see SessionFrameListener#onSyn(Stream, SynInfo) */ public Future reply(ReplyInfo replyInfo); @@ -67,9 +69,12 @@ public interface Stream * reply has been actually sent.

* * @param replyInfo the metadata to send - * @param handler the completion handler that gets notified of reply sent + * @param timeout the operation's timeout + * @param unit the timeout's unit + * @param handler the completion handler that gets notified of reply sent + * @see #reply(ReplyInfo) */ - public void reply(ReplyInfo replyInfo, Handler handler); + public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler handler); /** *

Sends asynchronously a DATA frame on this stream.

@@ -78,6 +83,7 @@ public interface Stream * * @param dataInfo the metadata to send * @return a future to wait for the data to be sent + * @see #data(DataInfo, long, TimeUnit, Handler) * @see #reply(ReplyInfo) */ public Future data(DataInfo dataInfo); @@ -89,9 +95,12 @@ public interface Stream * data has been actually sent.

* * @param dataInfo the metadata to send + * @param timeout the operation's timeout + * @param unit the timeout's unit * @param handler the completion handler that gets notified of data sent + * @see #data(DataInfo) */ - public void data(DataInfo dataInfo, Handler handler); + public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler handler); /** *

Sends asynchronously a HEADER frame on this stream.

@@ -100,6 +109,7 @@ public interface Stream * * @param headersInfo the metadata to send * @return a future to wait for the headers to be sent + * @see #headers(HeadersInfo, long, TimeUnit, Handler) * @see #reply(ReplyInfo) */ public Future headers(HeadersInfo headersInfo); @@ -111,9 +121,12 @@ public interface Stream * headers have been actually sent.

* * @param headersInfo the metadata to send - * @param handler the completion handler that gets notified of headers sent + * @param timeout the operation's timeout + * @param unit the timeout's unit + * @param handler the completion handler that gets notified of headers sent + * @see #headers(HeadersInfo) */ - public void headers(HeadersInfo headersInfo, Handler handler); + public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler handler); /** * @return whether this stream has been closed by both parties @@ -123,7 +136,9 @@ public interface Stream /** * @return whether this stream has been closed by one party only - * @see #isClosed() + * @see #isClosed() * @param timeout the timeout for the stream creation + * @param unit the timeout's unit + */ public boolean isHalfClosed(); @@ -135,7 +150,7 @@ public interface Stream public Object getAttribute(String key); /** - * @param key the attribute key + * @param key the attribute key * @param value an arbitrary object to associate with the given key to this stream * @see #getAttribute(String) * @see #removeAttribute(String) diff --git a/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ClientUsageTest.java b/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ClientUsageTest.java index 6a919d9f14f..19a4840719b 100644 --- a/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ClientUsageTest.java +++ b/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ClientUsageTest.java @@ -17,6 +17,7 @@ package org.eclipse.jetty.spdy.api; import java.nio.charset.Charset; +import java.util.concurrent.TimeUnit; import org.eclipse.jetty.spdy.StandardSession; import org.junit.Ignore; @@ -82,7 +83,7 @@ public class ClientUsageTest // Then issue another similar request stream.getSession().syn(new SynInfo(true), this); } - }, new Handler.Adapter() + }, 0, TimeUnit.MILLISECONDS, new Handler.Adapter() { @Override public void completed(Stream stream) @@ -137,7 +138,7 @@ public class ClientUsageTest } } - }, new Handler.Adapter() + }, 0, TimeUnit.MILLISECONDS, new Handler.Adapter() { @Override public void completed(Stream stream) diff --git a/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ServerUsageTest.java b/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ServerUsageTest.java index 1976625722d..ceb9f446d13 100644 --- a/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ServerUsageTest.java +++ b/spdy-core/src/test/java/org/eclipse/jetty/spdy/api/ServerUsageTest.java @@ -16,6 +16,8 @@ package org.eclipse.jetty.spdy.api; +import java.util.concurrent.TimeUnit; + import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; import org.junit.Ignore; import org.junit.Test; @@ -71,7 +73,7 @@ public class ServerUsageTest // // However, the API may allow to initiate the stream - session.syn(new SynInfo(false), null, new Handler.Adapter() + session.syn(new SynInfo(false), null, 0, TimeUnit.MILLISECONDS, new Handler.Adapter() { @Override public void completed(Stream stream) @@ -101,7 +103,7 @@ public class ServerUsageTest Session session = stream.getSession(); // Since it's unidirectional, no need to pass the listener - session.syn(new SynInfo(new Headers(), false, true, stream.getId(), (byte)0), null, new Handler.Adapter() + session.syn(new SynInfo(new Headers(), false, true, stream.getId(), (byte)0), null, 0, TimeUnit.MILLISECONDS, new Handler.Adapter() { @Override public void completed(Stream pushStream) diff --git a/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PingTest.java b/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PingTest.java index e23df1439a5..49be5ebcde3 100644 --- a/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PingTest.java +++ b/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/PingTest.java @@ -65,7 +65,7 @@ public class PingTest extends AbstractTest @Override public void onConnect(Session session) { - session.ping(new Handler.Adapter() + session.ping(0, TimeUnit.MILLISECONDS, new Handler.Adapter() { @Override public void completed(PingInfo pingInfo) diff --git a/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SynReplyTest.java b/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SynReplyTest.java index 9a6a908fc8e..f382f0881d6 100644 --- a/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SynReplyTest.java +++ b/spdy-jetty/src/test/java/org/eclipse/jetty/spdy/SynReplyTest.java @@ -280,7 +280,7 @@ public class SynReplyTest extends AbstractTest Assert.assertEquals(clientData, data); clientDataLatch.countDown(); } - }, new Handler.Adapter() + }, 0, TimeUnit.MILLISECONDS, new Handler.Adapter() { @Override public void completed(Stream stream)