Updated the API to support fully asynchronous API usage.

This commit is contained in:
Simone Bordet 2012-02-15 19:04:58 +01:00
parent 433de85fda
commit 176230c344
16 changed files with 613 additions and 143 deletions

View File

@ -19,14 +19,15 @@ package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.frames.ControlFrame;
public interface ISession extends Session
{
public void control(IStream stream, ControlFrame frame) throws StreamException;
public void control(IStream stream, ControlFrame frame, Handler handler) throws StreamException;
public void data(IStream stream, DataInfo dataInfo);
public void data(IStream stream, DataInfo dataInfo, Handler handler);
public int getWindowSize();

View File

@ -0,0 +1,93 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.spdy.api.ResultHandler;
public class Promise<T> extends ResultHandler<T> implements Future<T>
{
private final CountDownLatch latch = new CountDownLatch(1);
private boolean cancelled;
private Throwable failure;
private T promise;
@Override
public void completed(T result)
{
fulfilled(result);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
cancelled = true;
latch.countDown();
return true;
}
@Override
public boolean isCancelled()
{
return cancelled;
}
@Override
public boolean isDone()
{
return cancelled || latch.getCount() == 0;
}
@Override
public T get() throws InterruptedException, ExecutionException
{
latch.await();
return result();
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
{
latch.await(timeout, unit);
return result();
}
private T result() throws ExecutionException
{
Throwable failure = this.failure;
if (failure != null)
throw new ExecutionException(failure);
return promise;
}
public void failed(Throwable x)
{
this.failure = x;
latch.countDown();
}
public void fulfilled(T promise)
{
this.promise = promise;
latch.countDown();
}
}

View File

@ -24,12 +24,15 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.ResultHandler;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDYException;
import org.eclipse.jetty.spdy.api.Session;
@ -101,7 +104,15 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
}
@Override
public Stream syn(SynInfo synInfo, StreamFrameListener listener)
public Future<Stream> syn(SynInfo synInfo, StreamFrameListener listener)
{
Promise<Stream> result = new Promise<>();
syn(synInfo, listener, result);
return result;
}
@Override
public void syn(SynInfo synInfo, StreamFrameListener listener, final ResultHandler<Stream> handler)
{
// Synchronization is necessary.
// SPEC v3, 2.3.1 requires that the stream creation be monotonically crescent
@ -120,111 +131,164 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
{
int streamId = streamIds.getAndAdd(2);
SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, 0, synInfo.getPriority(), synInfo.getHeaders());
IStream stream = createStream(synStream, listener);
final IStream stream = createStream(synStream, listener);
try
{
// May throw if wrong version or headers too big
control(stream, synStream);
control(stream, synStream, new Handler()
{
@Override
public void completed()
{
handler.completed(stream);
}
@Override
public void failed(Throwable x)
{
handler.failed(x);
}
});
flush();
}
catch (StreamException x)
{
removeStream(stream);
handler.failed(x);
throw new SPDYException(x);
}
return stream;
}
}
}
@Override
public void rst(RstInfo rstInfo)
public Future<Void> rst(RstInfo rstInfo)
{
Promise<Void> result = new Promise<>();
rst(rstInfo, result);
return result;
}
@Override
public void rst(RstInfo rstInfo, Handler handler)
{
try
{
// SPEC v3, 2.2.2
if (!goAwaySent.get())
if (goAwaySent.get())
{
handler.completed();
}
else
{
RstStreamFrame frame = new RstStreamFrame(version, rstInfo.getStreamId(), rstInfo.getStreamStatus().getCode(version));
control(null, frame);
control(null, frame, handler);
flush();
}
}
catch (StreamException x)
{
logger.info("Could not send reset on stream " + rstInfo.getStreamId(), x);
handler.failed(x);
}
}
@Override
public void settings(SettingsInfo settingsInfo)
public Future<Void> settings(SettingsInfo settingsInfo)
{
SettingsFrame frame = new SettingsFrame(version, settingsInfo.getFlags(), settingsInfo.getSettings());
settings(frame);
flush();
Promise<Void> result = new Promise<>();
settings(settingsInfo, result);
return result;
}
private void settings(SettingsFrame frame)
@Override
public void settings(SettingsInfo settingsInfo, Handler handler)
{
try
{
control(null, frame);
SettingsFrame frame = new SettingsFrame(version, settingsInfo.getFlags(), settingsInfo.getSettings());
control(null, frame, handler);
flush();
}
catch (StreamException x)
{
// Should never happen, but just in case we rethrow
handler.failed(x);
throw new SPDYException(x);
}
}
@Override
public PingInfo ping()
public Future<PingInfo> ping()
{
int pingId = pingIds.getAndAdd(2);
PingFrame frame = new PingFrame(version, pingId);
ping(frame);
flush();
return new PingInfo(pingId);
Promise<PingInfo> result = new Promise<>();
ping(result);
return result;
}
private void ping(PingFrame frame)
@Override
public void ping(final ResultHandler<PingInfo> handler)
{
try
{
control(null, frame);
int pingId = pingIds.getAndAdd(2);
final PingInfo pingInfo = new PingInfo(pingId);
PingFrame frame = new PingFrame(version, pingId);
control(null, frame, new Handler()
{
@Override
public void completed()
{
handler.completed(pingInfo);
}
@Override
public void failed(Throwable x)
{
handler.failed(x);
}
});
flush();
}
catch (StreamException x)
{
// Should never happen, but just in case we rethrow
handler.failed(x);
throw new SPDYException(x);
}
}
@Override
public void goAway()
public Future<Void> goAway()
{
Promise<Void> result = new Promise<>();
goAway(result);
return result;
}
@Override
public void goAway(Handler handler)
{
if (goAwaySent.compareAndSet(false, true))
{
if (!goAwayReceived.get())
{
GoAwayFrame frame = new GoAwayFrame(version, lastStreamId, SessionStatus.OK.getCode());
goAway(frame);
flush();
try
{
GoAwayFrame frame = new GoAwayFrame(version, lastStreamId, SessionStatus.OK.getCode());
control(null, frame, handler);
flush();
return;
}
catch (StreamException x)
{
// Should never happen, but just in case we rethrow
handler.failed(x);
throw new SPDYException(x);
}
}
}
}
private void goAway(GoAwayFrame frame)
{
try
{
control(null, frame);
}
catch (StreamException x)
{
// Should never happen, but just in case we rethrow
throw new SPDYException(x);
}
handler.completed();
}
@Override
@ -465,12 +529,19 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
private void onPing(PingFrame frame)
{
int pingId = frame.getPingId();
if (pingId % 2 == pingIds.get() % 2)
notifyOnPing(frame);
else
ping(frame);
flush();
try
{
int pingId = frame.getPingId();
if (pingId % 2 == pingIds.get() % 2)
notifyOnPing(frame);
else
control(null, frame, new Promise<>());
flush();
}
catch (StreamException x)
{
throw new SPDYException(x);
}
}
private void onGoAway(GoAwayFrame frame)
@ -593,13 +664,13 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
}
@Override
public void control(IStream stream, ControlFrame frame) throws StreamException
public void control(IStream stream, ControlFrame frame, Handler handler) throws StreamException
{
if (stream != null)
updateLastStreamId(stream);
ByteBuffer buffer = generator.control(frame);
logger.debug("Posting {} on {}", frame, stream);
enqueueLast(new ControlFrameBytes(frame, buffer));
enqueueLast(new ControlFrameBytes(frame, buffer, handler));
}
private void updateLastStreamId(IStream stream)
@ -615,10 +686,10 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
}
@Override
public void data(IStream stream, DataInfo dataInfo)
public void data(IStream stream, DataInfo dataInfo, Handler handler)
{
logger.debug("Posting {} on {}", dataInfo, stream);
enqueueLast(new DataFrameBytes(stream, dataInfo));
enqueueLast(new DataFrameBytes(stream, dataInfo, handler));
flush();
}
@ -704,11 +775,13 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
{
private final ControlFrame frame;
private final ByteBuffer buffer;
private final Handler handler;
private ControlFrameBytes(ControlFrame frame, ByteBuffer buffer)
private ControlFrameBytes(ControlFrame frame, ByteBuffer buffer, Handler handler)
{
this.frame = frame;
this.buffer = buffer;
this.handler = handler;
}
@Override
@ -726,6 +799,7 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
// Recipients will know the last good stream id and act accordingly.
controller.close(false);
}
handler.completed();
}
@Override
@ -739,12 +813,14 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
{
private final IStream stream;
private final DataInfo data;
private final Handler handler;
private int dataLength;
private DataFrameBytes(IStream stream, DataInfo data)
private DataFrameBytes(IStream stream, DataInfo data, Handler handler)
{
this.stream = stream;
this.data = data;
this.handler = handler;
}
@Override
@ -776,6 +852,7 @@ public class StandardSession implements ISession, Parser.Listener, ISession.Cont
stream.updateCloseState(data.isClose());
if (stream.isClosed())
removeStream(stream);
handler.completed();
}
}

View File

@ -19,10 +19,12 @@ package org.eclipse.jetty.spdy;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Handler;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
@ -203,7 +205,7 @@ public class StandardStream implements IStream
// we will send many window update frames... perhaps we can delay
// window update frames until we have a bigger delta to send
WindowUpdateFrame windowUpdateFrame = new WindowUpdateFrame(session.getVersion(), getId(), delta);
session.control(this, windowUpdateFrame);
session.control(this, windowUpdateFrame, new Promise<>());
}
}
catch (StreamException x)
@ -265,41 +267,67 @@ public class StandardStream implements IStream
}
@Override
public void reply(ReplyInfo replyInfo)
public Future<Void> reply(ReplyInfo replyInfo)
{
Promise<Void> result = new Promise<>();
reply(replyInfo, result);
return result;
}
@Override
public void reply(ReplyInfo replyInfo, Handler handler)
{
try
{
updateCloseState(replyInfo.isClose());
SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
session.control(this, frame);
session.control(this, frame, handler);
}
catch (StreamException x)
{
logger.debug("Could not send reply on stream " + this, x);
handler.failed(x);
session.rst(new RstInfo(getId(), x.getStreamStatus()));
}
}
@Override
public void data(DataInfo dataInfo)
public Future<Void> data(DataInfo dataInfo)
{
// Cannot update the close state here, because the data that we send may
// be flow controlled, so we need the stream to update the window size.
session.data(this, dataInfo);
Promise<Void> result = new Promise<>();
data(dataInfo, result);
return result;
}
@Override
public void headers(HeadersInfo headersInfo)
public void data(DataInfo dataInfo, Handler handler)
{
// Cannot update the close state here, because the data that we send may
// be flow controlled, so we need the stream to update the window size.
session.data(this, dataInfo, handler);
}
@Override
public Future<Void> headers(HeadersInfo headersInfo)
{
Promise<Void> result = new Promise<>();
headers(headersInfo, result);
return result;
}
@Override
public void headers(HeadersInfo headersInfo, Handler handler)
{
try
{
updateCloseState(headersInfo.isClose());
HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
session.control(this, frame);
session.control(this, frame, handler);
}
catch (StreamException x)
{
logger.debug("Could not send headers on stream " + this, x);
handler.failed(x);
session.rst(new RstInfo(getId(), x.getStreamStatus()));
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.api;
import java.util.concurrent.TimeUnit;
public abstract class Handler
{
private final int timeout;
private final TimeUnit timeUnit;
protected Handler()
{
this(0, TimeUnit.MILLISECONDS);
}
protected Handler(int timeout, TimeUnit timeUnit)
{
this.timeout = timeout;
this.timeUnit = timeUnit;
}
public abstract void completed();
public void failed(Throwable x)
{
throw new SPDYException(x);
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright (c) 2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.jetty.spdy.api;
public abstract class ResultHandler<R> extends Handler
{
@Override
public final void completed()
{
completed(null);
}
public abstract void completed(R result);
}

View File

@ -18,6 +18,7 @@ package org.eclipse.jetty.spdy.api;
import java.util.EventListener;
import java.util.List;
import java.util.concurrent.Future;
/**
* <p>A {@link Session} represents the client-side endpoint of a SPDY connection to a single origin server.</p>
@ -63,39 +64,100 @@ public interface Session
public void removeListener(Listener listener);
/**
* <p>Sends a SYN_FRAME to create a new {@link Stream SPDY stream}.</p>
* <p>Sends asynchronously a SYN_FRAME to create a new {@link Stream SPDY stream}.</p>
* <p>Callers may use the returned future to wait for the stream to be created, and
* use the stream, for example, to send data frames.</p>
*
* @param synInfo the metadata to send on stream creation
* @param listener the listener to invoke when events happen on the stream just created
* @return the stream just created
* @return a future for the stream that will be created
* @see #syn(SynInfo, StreamFrameListener, ResultHandler)
*/
public Stream syn(SynInfo synInfo, StreamFrameListener listener);
public Future<Stream> syn(SynInfo synInfo, StreamFrameListener listener);
/**
* <p>Sends a RST_STREAM to abort a stream.</p>
* <p>Sends asynchronously a SYN_FRAME to create a new {@link Stream SPDY stream}.</p>
* <p>Callers may pass a non-null completion handler to be notified of when the
* stream has been created and use the stream, for example, to send data frames.</p>
*
* @param synInfo the metadata to send on stream creation
* @param listener the listener to invoke when events happen on the stream just created
* @param handler the completion handler that gets notified of stream creation
* @see #syn(SynInfo, StreamFrameListener)
*/
public void syn(SynInfo synInfo, StreamFrameListener listener, ResultHandler<Stream> handler);
/**
* <p>Sends asynchronously a RST_STREAM to abort a stream.</p>
* <p>Callers may use the returned future to wait for the reset to be sent.</p>
*
* @param rstInfo the metadata to reset the stream
* @return a future to wait for the reset to be sent
*/
public void rst(RstInfo rstInfo);
public Future<Void> rst(RstInfo rstInfo);
/**
* <p>Sends a SETTINGS to configure the SPDY connection.</p>
* <p>Sends asynchronously a RST_STREAM to abort a stream.</p>
* <p>Callers may pass a non-null completion handler to be notified of when the
* reset has been actually sent.</p>
*
* @param rstInfo the metadata to reset the stream
* @param handler the completion handler that gets notified of reset's send
*/
public void rst(RstInfo rstInfo, Handler handler);
/**
* <p>Sends asynchronously a SETTINGS to configure the SPDY connection.</p>
* <p>Callers may use the returned future to wait for the settings to be sent.</p>
*
* @param settingsInfo the metadata to send
* @return a future to wait for the settings to be sent
*/
public void settings(SettingsInfo settingsInfo);
public Future<Void> settings(SettingsInfo settingsInfo);
/**
* <p>Sends a PING, normally to measure round-trip time.</p>
* <p>Sends asynchronously a SETTINGS to configure the SPDY connection.</p>
* <p>Callers may pass a non-null completion handler to be notified of when the
* settings has been actually sent.</p>
*
* @return the metadata sent
* @param settingsInfo the metadata to send
* @param handler the completion handler that gets notified of settings' send
*/
public PingInfo ping();
public void settings(SettingsInfo settingsInfo, Handler handler);
/**
* <p>Sends asynchronously a PING, normally to measure round-trip time.</p>
* <p>Callers may use the returned future to wait for the ping to be sent.</p>
*
* @return a future for the metadata sent
*/
public Future<PingInfo> ping();
/**
* <p>Sends asynchronously a PING, normally to measure round-trip time.</p>
* <p>Callers may pass a non-null completion handler to be notified of when the
* ping has been actually sent.</p>
*
* @param handler the completion handler that gets notified of ping's send
*/
public void ping(ResultHandler<PingInfo> handler);
/**
* <p>Closes gracefully this session, sending a GO_AWAY frame and then closing the TCP connection.</p>
* <p>Callers may use the returned future to wait for the go away to be sent.</p>
*
* @return a future to wait for the go away to be sent
*/
public void goAway();
public Future<Void> goAway();
/**
* <p>Closes gracefully this session, sending a GO_AWAY frame and then closing the TCP connection.</p>
* <p>Callers may pass a non-null completion handler to be notified of when the
* go away has been actually sent.</p>
*
* @param handler the completion handler that gets notified of go away's send
*/
public void goAway(Handler handler);
/**
* <p>Initiates the flush of data to the other peer.</p>
@ -151,4 +213,27 @@ public interface Session
}
}
}
/*
public static abstract class SynHandler extends Promise<Stream>
{
@Override
public final void completed()
{
// Applications should not override this method, but the one below
}
public abstract void completed(Stream stream);
}
public static abstract class PingHandler extends Promise<PingInfo>
{
@Override
public final void completed()
{
// Applications should not override this method, but the one below
}
public abstract void completed(PingInfo stream);
}
*/
}

View File

@ -16,6 +16,8 @@
package org.eclipse.jetty.spdy.api;
import java.util.concurrent.Future;
/**
* <p>A {@link Stream} represents an bidirectional exchange of data on top of a {@link Session}.</p>
* <p>Differently from socket streams, where the input and output streams are permanently associated
@ -50,30 +52,68 @@ public interface Stream
public Session getSession();
/**
* <p>Sends a SYN_REPLY frame in response to a SYN_STREAM frame.</p>
* <p>Sends asynchronously a SYN_REPLY frame in response to a SYN_STREAM frame.</p>
* <p>Callers may use the returned future to wait for the reply to be actually sent.</p>
*
* @param replyInfo the metadata to send
* @return a future to wait for the reply to be sent
* @see SessionFrameListener#onSyn(Stream, SynInfo)
*/
public void reply(ReplyInfo replyInfo);
public Future<Void> reply(ReplyInfo replyInfo);
/**
* <p>Sends a DATA frame on this stream.</p>
* <p>Sends asynchronously a SYN_REPLY frame in response to a SYN_STREAM frame.</p>
* <p>Callers may pass a non-null completion handler to be notified of when the
* reply has been actually sent.</p>
*
* @param replyInfo the metadata to send
* @param handler the completion handler that gets notified of reply sent
*/
public void reply(ReplyInfo replyInfo, Handler handler);
/**
* <p>Sends asynchronously a DATA frame on this stream.</p>
* <p>DATA frames should always be sent after a SYN_REPLY frame.</p>
* <p>Callers may use the returned future to wait for the data to be actually sent.</p>
*
* @param dataInfo the metadata to send
* @return a future to wait for the data to be sent
* @see #reply(ReplyInfo)
*/
public void data(DataInfo dataInfo);
public Future<Void> data(DataInfo dataInfo);
/**
* <p>Sends a HEADER frame on this stream.</p>
* <p>Sends asynchronously a DATA frame on this stream.</p>
* <p>DATA frames should always be sent after a SYN_REPLY frame.</p>
* <p>Callers may pass a non-null completion handler to be notified of when the
* data has been actually sent.</p>
*
* @param dataInfo the metadata to send
* @param handler the completion handler that gets notified of data sent
*/
public void data(DataInfo dataInfo, Handler handler);
/**
* <p>Sends asynchronously a HEADER frame on this stream.</p>
* <p>HEADERS frames should always be sent after a SYN_REPLY frame.</p>
* <p>Callers may use the returned future to wait for the headers to be actually sent.</p>
*
* @param headersInfo the metadata to send
* @return a future to wait for the headers to be sent
* @see #reply(ReplyInfo)
*/
public void headers(HeadersInfo headersInfo);
public Future<Void> headers(HeadersInfo headersInfo);
/**
* <p>Sends asynchronously a HEADER frame on this stream.</p>
* <p>HEADERS frames should always be sent after a SYN_REPLY frame.</p>
* <p>Callers may pass a non-null completion handler to be notified of when the
* headers have been actually sent.</p>
*
* @param headersInfo the metadata to send
* @param handler the completion handler that gets notified of headers sent
*/
public void headers(HeadersInfo headersInfo, Handler handler);
/**
* @return whether this stream has been closed by both parties

View File

@ -30,7 +30,7 @@ public class ClientUsageTest
{
Session session = new StandardSession(SPDY.V2, null, 1, null, null);
session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
session.syn(new SynInfo(true), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
@ -45,60 +45,107 @@ public class ClientUsageTest
}
@Test
public void testClientRequestWithBodyAndResponseWithBody() throws Exception
public void testClientRequestWithBodyResponseNoBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, 1, null, null);
Stream stream = session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
{
// The good of passing the listener here is that you can safely accumulate info
// from the headers to be used in the data, e.g. content-type, charset
// In BWTP the listener was attached to the session, not passed to syn(), so could
// not accumulate if not adding attributes to the stream (which is a good idea anyway)
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Do something with the response
replyInfo.getHeaders().get("host");
// Then issue another similar request
stream.getSession().syn(new SynInfo(true), this);
}
}).get();
// Send-and-forget the data
stream.data(new StringDataInfo("data", true));
}
@Test
public void testAsyncClientRequestWithBodyResponseNoBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, 1, null, null);
final String context = "context";
session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Do something with the response
replyInfo.getHeaders().get("host");
// Then issue another similar request
stream.getSession().syn(new SynInfo(true), this);
}
}, new ResultHandler<Stream>()
{
@Override
public void completed(Stream stream)
{
// Differently from JDK 7 AIO, there is no need to
// have an explicit parameter for the context since
// that is captured while the handler is created anyway,
// and it is used only by the handler as parameter
// The style below is fire-and-forget, since
// we do not pass the handler nor we call get()
// to wait for the data to be sent
stream.data(new StringDataInfo(context, true));
}
});
}
@Test
public void testAsyncClientRequestWithBodyAndResponseWithBody() throws Exception
{
Session session = new StandardSession(SPDY.V2, null, 1, null, null);
session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
{
// The good of passing the listener to syn() is that applications can safely
// accumulate info from the reply headers to be used in the data callback,
// e.g. content-type, charset, etc.
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// This style is similar to the new async channel API in JDK 7
// Do something with the response
int contentLength = replyInfo.getHeaders().get("content-length").valueAsInt();
// stream.setAttribute("content-length", contentLength);
Headers headers = replyInfo.getHeaders();
int contentLength = headers.get("content-length").valueAsInt();
stream.setAttribute("content-length", contentLength);
if (!replyInfo.isClose())
stream.setAttribute("builder", new StringBuilder());
// Then issue another similar request
// May issue another similar request while waiting for data
stream.getSession().syn(new SynInfo(true), this);
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
// StringBuilder builder = new StringBuilder();
// builder.append(Charset.forName("UTF-8").decode(data));
StringBuilder builder = (StringBuilder)stream.getAttribute("builder");
builder.append(dataInfo.asString("UTF-8"));
if (dataInfo.isClose())
{
// System.err.println("data = " + builder);
// assert builder.toString().getBytes().length == stream.getAttribute("content-length");
int receivedLength = builder.toString().getBytes(Charset.forName("UTF-8")).length;
assert receivedLength == (Integer)stream.getAttribute("content-length");
}
}
}, new ResultHandler<Stream>()
{
@Override
public void completed(Stream stream)
{
stream.data(new BytesDataInfo("wee".getBytes(Charset.forName("UTF-8")), false));
stream.data(new StringDataInfo("foo", false));
stream.data(new ByteBufferDataInfo(Charset.forName("UTF-8").encode("bar"), true));
}
});
stream.data(new BytesDataInfo("wee".getBytes(Charset.forName("UTF-8")), false));
stream.data(new StringDataInfo("foo", false));
stream.data(new ByteBufferDataInfo(Charset.forName("UTF-8").encode("bar"), false));
// stream.data(new InputStreamDataInfo(new ByteArrayInputStream("baz".getBytes(Charset.forName("UTF-8"))), false));
//
// In CometD the style is different, but in bayeux the frame IS the message,
// while in SPDY the message is composed of several frames of different types
// e.g. synReply+data vs a single bayeux message
// That is why the listeners in Bayeux are simpler: you can only receive messages
// However, we can mimic Bayeux's behavior with SPDY if we add another layer on top of it
// that produces a Message that has an input stream (so that arbitrarily long bodies can be
// read without exhausting the memory).
}
}

View File

@ -32,7 +32,8 @@ public class ServerUsageTest
public StreamFrameListener onSyn(Stream stream, SynInfo streamInfo)
{
Headers synHeaders = streamInfo.getHeaders();
// Do something with headers, for example extract them and perform an http request via Jetty's LocalConnector
// Do something with headers, for example extract them and
// perform an http request via Jetty's LocalConnector
// Get the http response, fill headers and data
Headers replyHeaders = new Headers();
@ -68,17 +69,21 @@ public class ServerUsageTest
// C <-- SYN_REPLY(id=1) --- S
// C <-- SYN_STREAM(id=2,uni,assId=1) --- S
//
// However, the API may allow to initiate the stream like in bwtp
// However, the API may allow to initiate the stream
SynInfo synInfo = new SynInfo(new Headers(), false, true, 0, (byte)0);
Stream stream = session.syn(synInfo, null);
// The point here is that we have no idea if the client accepted our stream
// So we return a stream, we may be able to send the headers frame, but later
// the client sends a rst frame.
// We have to atomically set some flag on the stream to signal it's closed
// and any operation on it will throw
stream.headers(new HeadersInfo(new Headers(), false, false));
session.syn(new SynInfo(false), null, new ResultHandler<Stream>()
{
@Override
public void completed(Stream stream)
{
// The point here is that we have no idea if the client accepted our stream
// So we return a stream, we may be able to send the headers frame, but later
// the client sends a rst frame.
// We have to atomically set some flag on the stream to signal it's closed
// and any operation on it will throw
stream.headers(new HeadersInfo(new Headers(), true));
}
});
}
};
}
@ -91,10 +96,19 @@ public class ServerUsageTest
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo streamInfo)
{
// Need to send the reply first
stream.reply(new ReplyInfo(false));
Session session = stream.getSession();
// Since it's unidirectional, no need to pass the listener
Stream pushStream = session.syn(new SynInfo(new Headers(), false, true, stream.getId(), (byte)0), null);
pushStream.data(new StringDataInfo("foo", false));
session.syn(new SynInfo(new Headers(), false, true, stream.getId(), (byte)0), null, new ResultHandler<Stream>()
{
@Override
public void completed(Stream pushStream)
{
pushStream.data(new StringDataInfo("foo", false));
}
});
return null;
}
};

View File

@ -284,7 +284,7 @@ public class ServerHTTPSPDYTest
Assert.assertTrue(replyHeaders.get("status").value().contains("200"));
replyLatch.countDown();
}
});
}).get();
stream.data(new StringDataInfo(data, true));
Assert.assertTrue(handlerLatch.await(5, TimeUnit.SECONDS));
@ -329,7 +329,7 @@ public class ServerHTTPSPDYTest
Assert.assertTrue(replyHeaders.get("status").value().contains("200"));
replyLatch.countDown();
}
});
}).get();
// Sleep between the data frames so that they will be read in 2 reads
stream.data(new StringDataInfo(data1, false));
Thread.sleep(1000);
@ -377,7 +377,7 @@ public class ServerHTTPSPDYTest
Assert.assertTrue(replyHeaders.get("status").value().contains("200"));
replyLatch.countDown();
}
});
}).get();
// Send the data frames consecutively, so the server reads both frames in one read
stream.data(new StringDataInfo(data1, false));
stream.data(new StringDataInfo(data2, true));

View File

@ -105,7 +105,7 @@ public class ConcurrentSynDataReplyDataTest extends AbstractTest
threadPool.shutdown();
}
private void process(Session session, Headers headers, int iterations) throws InterruptedException
private void process(Session session, Headers headers, int iterations) throws Exception
{
for (int i = 0; i < iterations; ++i)
{
@ -128,7 +128,7 @@ public class ConcurrentSynDataReplyDataTest extends AbstractTest
Assert.assertTrue(dataInfo.isConsumed());
latch.countDown();
}
});
}).get();
stream.data(new StringDataInfo("data_" + stream.getId(), true));
Assert.assertTrue("process() failed for stream=" + stream.getId(), latch.await(5, TimeUnit.SECONDS));
}

View File

@ -122,7 +122,7 @@ public class FlowControlTest extends AbstractTest
}
}), null);
Stream stream = session.syn(new SynInfo(true), null);
Stream stream = session.syn(new SynInfo(true), null).get();
int length = 128 * 1024;
stream.data(new BytesDataInfo(new byte[length], true));
@ -154,7 +154,7 @@ public class FlowControlTest extends AbstractTest
}
}), null);
Stream stream = session.syn(new SynInfo(true), null);
Stream stream = session.syn(new SynInfo(true), null).get();
int length = 128 * 1024;
stream.data(new BytesDataInfo(new byte[length], false));
stream.data(new BytesDataInfo(new byte[length], true));

View File

@ -96,7 +96,7 @@ public class GoAwayTest extends AbstractTest
};
Session session = startClient(startServer(serverSessionFrameListener), clientSessionFrameListener);
Stream stream1 = session.syn(new SynInfo(true), null);
Stream stream1 = session.syn(new SynInfo(true), null).get();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
GoAwayInfo goAwayInfo = ref.get();
@ -201,11 +201,11 @@ public class GoAwayTest extends AbstractTest
{
reply1Latch.countDown();
}
});
}).get();
Assert.assertTrue(reply1Latch.await(5, TimeUnit.SECONDS));
// Second stream is closed in the middle
Stream stream2 = session.syn(new SynInfo(false), null);
Stream stream2 = session.syn(new SynInfo(false), null).get();
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
// There is a race between the data we want to send, and the client

View File

@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.ResultHandler;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
@ -44,7 +45,7 @@ public class PingTest extends AbstractTest
}
};
Session session = startClient(startServer(null), clientSessionFrameListener);
PingInfo pingInfo = session.ping();
PingInfo pingInfo = session.ping().get();
Assert.assertEquals(1, pingInfo.getPingId() % 2);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
@ -64,8 +65,14 @@ public class PingTest extends AbstractTest
@Override
public void onConnect(Session session)
{
PingInfo pingInfo = session.ping();
this.pingId = pingInfo.getPingId();
session.ping(new ResultHandler<PingInfo>()
{
@Override
public void completed(PingInfo pingInfo)
{
pingId = pingInfo.getPingId();
}
});
}
@Override

View File

@ -29,6 +29,7 @@ import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.Headers;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.ResultHandler;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
@ -94,7 +95,7 @@ public class SynReplyTest extends AbstractTest
Assert.assertTrue(stream.isClosed());
replyLatch.countDown();
}
});
}).get();
Assert.assertTrue(synLatch.await(5, TimeUnit.SECONDS));
Session serverSession = sessionRef.get();
@ -176,7 +177,7 @@ public class SynReplyTest extends AbstractTest
Assert.assertTrue(stream.isClosed());
replyLatch.countDown();
}
});
}).get();
stream.data(new BytesDataInfo(dataBytes, true));
Assert.assertTrue(synLatch.await(5, TimeUnit.SECONDS));
@ -264,7 +265,7 @@ public class SynReplyTest extends AbstractTest
@Override
public void onConnect(Session session)
{
Stream stream = session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
session.syn(new SynInfo(false), new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
@ -279,8 +280,14 @@ public class SynReplyTest extends AbstractTest
Assert.assertEquals(clientData, data);
clientDataLatch.countDown();
}
}, new ResultHandler<Stream>()
{
@Override
public void completed(Stream stream)
{
stream.data(new StringDataInfo(serverData, true));
}
});
stream.data(new StringDataInfo(serverData, true));
}
};
@ -345,7 +352,7 @@ public class SynReplyTest extends AbstractTest
};
Session session = startClient(startServer(serverSessionFrameListener), null);
Stream stream = session.syn(new SynInfo(true), null);
Stream stream = session.syn(new SynInfo(true), null).get();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
RstInfo rstInfo = ref.get();