394210 spdy api rename stream.syn() to stream.push()

This commit is contained in:
Thomas Becker 2012-12-21 18:13:54 +01:00
parent 55d8088f05
commit e1a663865c
19 changed files with 199 additions and 78 deletions

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.spdy;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
/* ------------------------------------------------------------ */
@ -30,8 +31,8 @@ public class PushSynInfo extends SynInfo
private int associatedStreamId;
public PushSynInfo(int associatedStreamId, SynInfo synInfo){
super(synInfo.getHeaders(), synInfo.isClose(), synInfo.getPriority());
public PushSynInfo(int associatedStreamId, PushInfo pushInfo){
super(pushInfo.getHeaders(), pushInfo.isClose());
this.associatedStreamId = associatedStreamId;
}

View File

@ -28,12 +28,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.HeadersFrame;
import org.eclipse.jetty.spdy.frames.SynReplyFrame;
@ -335,18 +335,18 @@ public class StandardStream implements IStream
}
@Override
public Stream syn(SynInfo synInfo) throws InterruptedException, ExecutionException, TimeoutException
public Stream push(PushInfo pushInfo) throws InterruptedException, ExecutionException, TimeoutException
{
FuturePromise<Stream> result = new FuturePromise<>();
syn(synInfo, result);
if (synInfo.getTimeout() > 0)
return result.get(synInfo.getTimeout(), synInfo.getUnit());
push(pushInfo, result);
if (pushInfo.getTimeout() > 0)
return result.get(pushInfo.getTimeout(), pushInfo.getUnit());
else
return result.get();
}
@Override
public void syn(SynInfo synInfo, Promise<Stream> promise)
public void push(PushInfo pushInfo, Promise<Stream> promise)
{
if (isClosed() || isReset())
{
@ -354,7 +354,7 @@ public class StandardStream implements IStream
"Stream: " + this + " already closed or reset!"));
return;
}
PushSynInfo pushSynInfo = new PushSynInfo(getId(), synInfo);
PushSynInfo pushSynInfo = new PushSynInfo(getId(), pushInfo);
session.syn(pushSynInfo, null, promise);
}

View File

@ -0,0 +1,101 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.spdy.api;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.Fields;
/**
* <p>A container for PUSH_SYN_STREAM frames metadata and data.</p>
*/
public class PushInfo extends Info
{
/**
* <p>Flag that indicates that this {@link PushInfo} is the last frame in the stream.</p>
*
* @see #isClose()
* @see #getFlags()
*/
public static final byte FLAG_CLOSE = 1;
private final boolean close;
private final Fields headers;
/**
* <p>Creates a {@link PushInfo} instance with the given headers and the given close flag,
* not unidirectional, without associated stream, and with default priority.</p>
*
* @param headers the {@link Fields}
* @param close the value of the close flag
*/
public PushInfo(Fields headers, boolean close)
{
this(0, TimeUnit.SECONDS, headers, close);
// either builder or setters for timeout
}
/**
* <p>
* Creates a {@link PushInfo} instance with the given headers, the given close flag and with the given priority.
* </p>
* @param timeout the timeout value
* @param unit the TimeUnit of the timeout
* @param headers
* the {@link Fields}
* @param close
*/
public PushInfo(long timeout, TimeUnit unit, Fields headers, boolean close)
{
super(timeout, unit);
this.close = close;
this.headers = headers;
}
/**
* @return the value of the close flag
*/
public boolean isClose()
{
return close;
}
/**
* @return the {@link Fields}
*/
public Fields getHeaders()
{
return headers;
}
/**
* @return the close flag as integer
* @see #FLAG_CLOSE
*/
public byte getFlags()
{
return isClose() ? FLAG_CLOSE : 0;
}
@Override
public String toString()
{
return String.format("SYN push close=%b headers=%s", close, headers);
}
}

View File

@ -33,7 +33,7 @@ import org.eclipse.jetty.util.Promise;
* <pre>
* Session session = ...;
* SynInfo synInfo = new SynInfo(true);
* session.syn(synInfo, new Stream.FrameListener.Adapter()
* session.push(synInfo, new Stream.FrameListener.Adapter()
* {
* public void onReply(Stream stream, ReplyInfo replyInfo)
* {
@ -96,7 +96,6 @@ public interface Session
*/
public void syn(SynInfo synInfo, StreamFrameListener listener, Promise<Stream> promise);
/**
* <p>Sends asynchronously a RST_STREAM to abort a stream.</p>
* <p>Callers may use the returned future to wait for the reset to be sent.</p>

View File

@ -96,22 +96,24 @@ public interface Stream
* <p>Callers may use the returned future to get the pushstream once it got created</p>
*
*
* @param synInfo the metadata to send on stream creation
*
* @param pushInfo the metadata to send on stream creation
* @return a future containing the stream once it got established
* @see #syn(SynInfo, Promise)
* @see #push(PushInfo, Promise
*/
public Stream syn(SynInfo synInfo) throws InterruptedException, ExecutionException, TimeoutException;
public Stream push(PushInfo pushInfo) throws InterruptedException, ExecutionException, TimeoutException;
/**
* <p>Initiate a unidirectional spdy pushstream associated to this stream asynchronously<p>
* <p>Callers may pass a non-null completion callback to be notified of when the
* pushstream has been established.</p>
*
* @param synInfo the metadata to send on stream creation
*
* @param pushInfo the metadata to send on stream creation
* @param callback the completion callback that gets notified once the pushstream is established
* @see #syn(SynInfo)
* @see #push(PushInfo)
*/
public void syn(SynInfo synInfo, Promise<Stream> callback);
public void push(PushInfo pushInfo, Promise<Stream> callback);
/**
* <p>Sends asynchronously a SYN_REPLY frame in response to a SYN_STREAM frame.</p>

View File

@ -28,7 +28,7 @@ import org.eclipse.jetty.util.Fields;
public class SynInfo extends Info
{
/**
* <p>Flag that indicates that this {@link DataInfo} is the last frame in the stream.</p>
* <p>Flag that indicates that this {@link SynInfo} is the last frame in the stream.</p>
*
* @see #isClose()
* @see #getFlags()
@ -40,7 +40,7 @@ public class SynInfo extends Info
private final Fields headers;
/**
* <p>Creates a {@link ReplyInfo} instance with the given headers and the given close flag,
* <p>Creates a {@link SynInfo} instance with the given headers and the given close flag,
* not unidirectional, without associated stream, and with default priority.</p>
*
* @param headers the {@link Fields}
@ -54,7 +54,7 @@ public class SynInfo extends Info
/**
* <p>
* Creates a {@link ReplyInfo} instance with the given headers, the given close flag and with the given priority.
* Creates a {@link SynInfo} instance with the given headers, the given close flag and with the given priority.
* </p>
* @param headers
* the {@link Fields}
@ -69,7 +69,7 @@ public class SynInfo extends Info
/**
* <p>
* Creates a {@link ReplyInfo} instance with the given headers, the given close flag and with the given priority.
* Creates a {@link SynInfo} instance with the given headers, the given close flag and with the given priority.
* </p>
* @param timeout the timeout value
* @param unit the TimeUnit of the timeout

View File

@ -34,6 +34,7 @@ import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
@ -225,8 +226,8 @@ public class StandardSessionTest
private void createPushStreamAndMakeSureItFails(IStream stream) throws InterruptedException
{
final CountDownLatch failedLatch = new CountDownLatch(1);
SynInfo synInfo = new SynInfo(5, TimeUnit.SECONDS, headers, false, stream.getPriority());
stream.syn(synInfo, new Promise.Adapter<Stream>()
PushInfo pushInfo = new PushInfo(5, TimeUnit.SECONDS, headers, false);
stream.push(pushInfo, new Promise.Adapter<Stream>()
{
@Override
public void failed(Throwable x)
@ -259,7 +260,7 @@ public class StandardSessionTest
setControllerWriteExpectation(false);
IStream stream = createStream();
IStream pushStream = (IStream)stream.syn(new SynInfo(new Fields(), false));
IStream pushStream = (IStream)stream.push(new PushInfo(new Fields(), false));
assertThatPushStreamIsInSession(pushStream);
session.rst(new RstInfo(pushStream.getId(), StreamStatus.INVALID_STREAM));
assertThatPushStreamIsNotInSession(pushStream);
@ -273,8 +274,8 @@ public class StandardSessionTest
setControllerWriteExpectation(false);
IStream stream = createStream();
SynInfo synInfo = new SynInfo(5, TimeUnit.SECONDS, headers, true, stream.getPriority());
IStream pushStream = (IStream)stream.syn(synInfo);
PushInfo pushInfo = new PushInfo(5, TimeUnit.SECONDS, headers, true);
IStream pushStream = (IStream)stream.push(pushInfo);
assertThatPushStreamIsHalfClosed(pushStream);
assertThatPushStreamIsClosed(pushStream);
assertThatStreamIsNotAssociatedWithPushStream(stream, pushStream);
@ -288,8 +289,8 @@ public class StandardSessionTest
setControllerWriteExpectation(false);
IStream stream = createStream();
SynInfo synInfo = new SynInfo(5, TimeUnit.SECONDS, headers, false, stream.getPriority());
IStream pushStream = (IStream)stream.syn(synInfo);
PushInfo pushInfo = new PushInfo(5, TimeUnit.SECONDS, headers, false);
IStream pushStream = (IStream)stream.push(pushInfo);
assertThatStreamIsAssociatedWithPushStream(stream, pushStream);
assertThatPushStreamIsInSession(pushStream);
pushStream.headers(new HeadersInfo(headers, true));
@ -543,8 +544,8 @@ public class StandardSessionTest
private IStream createPushStream(Stream stream) throws InterruptedException, ExecutionException, TimeoutException
{
SynInfo synInfo = new SynInfo(5, TimeUnit.SECONDS, headers, false, stream.getPriority());
return (IStream)stream.syn(synInfo);
PushInfo pushInfo = new PushInfo(5, TimeUnit.SECONDS, headers, false);
return (IStream)stream.push(pushInfo);
}
private void assertThatStreamIsClosed(IStream stream)

View File

@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
@ -60,7 +61,7 @@ public class StandardStreamTest
private SynStreamFrame synStreamFrame;
/**
* Test method for {@link org.eclipse.jetty.spdy.StandardStream#syn(org.eclipse.jetty.spdy.api.SynInfo)}.
* Test method for {@link Stream#push(org.eclipse.jetty.spdy.api.PushInfo)}.
*/
@SuppressWarnings("unchecked")
@Test
@ -70,28 +71,29 @@ public class StandardStreamTest
Set<Stream> streams = new HashSet<>();
streams.add(stream);
when(synStreamFrame.isClose()).thenReturn(false);
SynInfo synInfo = new SynInfo(new Fields(), false);
PushInfo pushInfo = new PushInfo(new Fields(), false);
when(session.getStreams()).thenReturn(streams);
stream.syn(synInfo, new Promise.Adapter<Stream>());
verify(session).syn(argThat(new PushSynInfoMatcher(stream.getId(), synInfo)), any(StreamFrameListener.class), any(Promise.class));
stream.push(pushInfo, new Promise.Adapter<Stream>());
verify(session).syn(argThat(new PushSynInfoMatcher(stream.getId(), pushInfo)),
any(StreamFrameListener.class), any(Promise.class));
}
private class PushSynInfoMatcher extends ArgumentMatcher<PushSynInfo>
{
int associatedStreamId;
SynInfo synInfo;
private int associatedStreamId;
private PushInfo pushInfo;
public PushSynInfoMatcher(int associatedStreamId, SynInfo synInfo)
public PushSynInfoMatcher(int associatedStreamId, PushInfo pushInfo)
{
this.associatedStreamId = associatedStreamId;
this.synInfo = synInfo;
this.pushInfo = pushInfo;
}
@Override
public boolean matches(Object argument)
{
PushSynInfo pushSynInfo = (PushSynInfo)argument;
return pushSynInfo.getAssociatedStreamId() == associatedStreamId && pushSynInfo.isClose() == synInfo.isClose();
return pushSynInfo.getAssociatedStreamId() == associatedStreamId && pushSynInfo.isClose() == pushInfo.isClose();
}
}
@ -103,7 +105,7 @@ public class StandardStreamTest
stream.updateCloseState(true, false);
assertThat("stream expected to be closed", stream.isClosed(), is(true));
final CountDownLatch failedLatch = new CountDownLatch(1);
stream.syn(new SynInfo(1, TimeUnit.SECONDS, new Fields(), false, (byte)0), new Promise.Adapter<Stream>()
stream.push(new PushInfo(1, TimeUnit.SECONDS, new Fields(), false), new Promise.Adapter<Stream>()
{
@Override
public void failed(Throwable x)

View File

@ -137,7 +137,7 @@ public class ClientUsageTest
session.syn(new SynInfo(new Fields(), false), new StreamFrameListener.Adapter()
{
// The good of passing the listener to syn() is that applications can safely
// The good of passing the listener to push() is that applications can safely
// accumulate info from the reply headers to be used in the data callback,
// e.g. content-type, charset, etc.

View File

@ -37,11 +37,11 @@ import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.spdy.StreamException;
import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamStatus;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.util.BlockingCallback;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -215,8 +215,7 @@ public class HttpTransportOverSPDY implements HttpTransport
final Fields pushRequestHeaders = createRequestHeaders(scheme, host, uri, pushResource);
// TODO: handle the timeout better
stream.syn(new SynInfo(0, TimeUnit.MILLISECONDS, pushHeaders, false, (byte)0),
new Promise.Adapter<Stream>()
stream.push(new PushInfo(0, TimeUnit.MILLISECONDS, pushHeaders, false), new Promise.Adapter<Stream>()
{
@Override
public void succeeded(Stream pushStream)

View File

@ -39,9 +39,9 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>{@link ProxyEngineSelector} is the main entry point for syn stream events of a jetty SPDY proxy. It receives the
* syn stream frames from the clients, checks if there's an appropriate {@link ProxyServerInfo} for the given target
* host and forwards the syn to a {@link ProxyEngine} for the protocol defined in {@link ProxyServerInfo}.</p>
* <p>{@link ProxyEngineSelector} is the main entry point for push stream events of a jetty SPDY proxy. It receives the
* push stream frames from the clients, checks if there's an appropriate {@link ProxyServerInfo} for the given target
* host and forwards the push to a {@link ProxyEngine} for the protocol defined in {@link ProxyServerInfo}.</p>
*
* <p>If no {@link ProxyServerInfo} can be found for the given target host or no {@link ProxyEngine} can be found for
* the given protocol, it resets the client stream.</p>

View File

@ -44,6 +44,7 @@ import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SessionStatus;
@ -210,7 +211,7 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
}
@Override
public void syn(SynInfo synInfo, Promise<Stream> handler)
public void push(PushInfo pushInfo, Promise<Stream> handler)
{
// HTTP does not support pushed streams
handler.succeeded(new HTTPPushStream(2, getPriority(), getSession(), this));

View File

@ -31,6 +31,7 @@ import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.HeadersInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDY;
@ -460,8 +461,8 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
serverStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
clientStream.syn(new SynInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, serverSynInfo.isClose(),
(byte)0), handler);
clientStream.push(new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, serverSynInfo.isClose()),
handler);
return this;
}

View File

@ -132,11 +132,11 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
};
}
});
// Send main request. That should initiate the push syn's which get reset by the client
// Send main request. That should initiate the push push's which get reset by the client
sendRequest(session, mainRequestHeaders);
assertThat("No push data is received", pushDataLatch.await(1, TimeUnit.SECONDS), is(false));
assertThat("Push syn headers valid", pushSynHeadersValid.await(5, TimeUnit.SECONDS), is(true));
assertThat("Push push headers valid", pushSynHeadersValid.await(5, TimeUnit.SECONDS), is(true));
sendRequest(session, associatedCSSRequestHeaders);
}
@ -288,7 +288,7 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
else
assertThat("No push data is received", pushDataLatch.await(1, TimeUnit.SECONDS), is(false));
if (validateHeaders)
assertThat("Push syn headers valid", pushSynHeadersValid.await(5, TimeUnit.SECONDS), is(true));
assertThat("Push push headers valid", pushSynHeadersValid.await(5, TimeUnit.SECONDS), is(true));
}
@Test

View File

@ -38,6 +38,7 @@ import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.PingInfo;
import org.eclipse.jetty.spdy.api.PingResultInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDY;
@ -105,7 +106,7 @@ public class ProxyHTTPSPDYTest
{
server = new Server();
SPDYServerConnector serverConnector = new SPDYServerConnector(server, listener);
serverConnector.addConnectionFactory(new SPDYServerConnectionFactory(version,listener));
serverConnector.addConnectionFactory(new SPDYServerConnectionFactory(version, listener));
serverConnector.setPort(0);
server.addConnector(serverConnector);
server.start();
@ -567,7 +568,7 @@ public class ProxyHTTPSPDYTest
Fields pushHeaders = new Fields();
pushHeaders.put(HTTPSPDYHeader.URI.name(version), "/push");
stream.syn(new SynInfo(5, TimeUnit.SECONDS, pushHeaders, false, (byte)0), new Promise.Adapter<Stream>()
stream.push(new PushInfo(5, TimeUnit.SECONDS, pushHeaders, false), new Promise.Adapter<Stream>()
{
@Override
public void succeeded(Stream pushStream)
@ -620,7 +621,7 @@ public class ProxyHTTPSPDYTest
Fields pushHeaders = new Fields();
pushHeaders.put(HTTPSPDYHeader.URI.name(version), "/push");
stream.syn(new SynInfo(5, TimeUnit.SECONDS, pushHeaders, false, (byte)0), new Promise.Adapter<Stream>()
stream.push(new PushInfo(5, TimeUnit.SECONDS, pushHeaders, false), new Promise.Adapter<Stream>()
{
@Override
public void succeeded(Stream pushStream)

View File

@ -225,7 +225,7 @@ public class ClosedStreamTest extends AbstractTest
socketChannel.write(synData);
assertThat("synData is fully written", synData.hasRemaining(), is(false));
assertThat("server: syn reply is sent",serverReplySentLatch.await(5,TimeUnit.SECONDS),is(true));
assertThat("server: push reply is sent",serverReplySentLatch.await(5,TimeUnit.SECONDS),is(true));
Parser parser = new Parser(new StandardCompressionFactory.StandardDecompressor());
parser.addListener(new Listener.Adapter()

View File

@ -18,17 +18,17 @@
package org.eclipse.jetty.spdy.server;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.SPDYException;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionFrameListener;
import org.eclipse.jetty.spdy.api.SessionStatus;
@ -221,9 +221,9 @@ public class GoAwayTest extends AbstractTest
stream2.data(new StringDataInfo("foo", true));
Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
}
catch (SPDYException x)
catch (ExecutionException x)
{
Assert.assertThat(x.getCause(), CoreMatchers.instanceOf(ClosedChannelException.class));
Assert.assertThat(x.getCause(), CoreMatchers.instanceOf(EofException.class));
}
// The last good stream is the second, because it was received by the server

View File

@ -39,6 +39,7 @@ import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayReceivedInfo;
import org.eclipse.jetty.spdy.api.PushInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.RstInfo;
import org.eclipse.jetty.spdy.api.SPDY;
@ -63,6 +64,8 @@ import org.eclipse.jetty.spdy.parser.Parser.Listener;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.junit.Assert;
import org.junit.Test;
@ -74,6 +77,8 @@ import static org.junit.Assert.fail;
public class PushStreamTest extends AbstractTest
{
private static final Logger LOG = Log.getLogger(PushStreamTest.class);
@Test
public void testSynPushStream() throws Exception
{
@ -86,7 +91,7 @@ public class PushStreamTest extends AbstractTest
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
stream.reply(new ReplyInfo(false), new Callback.Adapter());
stream.syn(new SynInfo(new Fields(), true), new Promise.Adapter<Stream>());
stream.push(new PushInfo(new Fields(), true), new Promise.Adapter<Stream>());
return null;
}
}), new SessionFrameListener.Adapter()
@ -154,7 +159,7 @@ public class PushStreamTest extends AbstractTest
streamDataSent.countDown();
if (pushStreamDataReceived.getCount() == 2)
{
Stream pushStream = stream.syn(new SynInfo(new Fields(), false));
Stream pushStream = stream.push(new PushInfo(new Fields(), false));
streamExchanger.exchange(pushStream, 5, TimeUnit.SECONDS);
}
}
@ -246,20 +251,21 @@ public class PushStreamTest extends AbstractTest
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
stream.reply(new ReplyInfo(true), new Callback.Adapter());
stream.syn(new SynInfo(1, TimeUnit.SECONDS, new Fields(), false, (byte)0), new Promise.Adapter<Stream>()
{
@Override
public void failed(Throwable x)
{
pushStreamFailedLatch.countDown();
}
});
stream.push(new PushInfo(1, TimeUnit.SECONDS, new Fields(), false),
new Promise.Adapter<Stream>()
{
@Override
public void failed(Throwable x)
{
pushStreamFailedLatch.countDown();
}
});
return super.onSyn(stream, synInfo);
}
}), new SessionFrameListener.Adapter());
clientSession.syn(new SynInfo(new Fields(), true), null);
assertThat("pushStream syn has failed", pushStreamFailedLatch.await(5, TimeUnit.SECONDS), is(true));
assertThat("pushStream push has failed", pushStreamFailedLatch.await(5, TimeUnit.SECONDS), is(true));
}
@Test
@ -279,7 +285,7 @@ public class PushStreamTest extends AbstractTest
{
try
{
Stream pushStream = stream.syn(new SynInfo(new Fields(), false));
Stream pushStream = stream.push(new PushInfo(new Fields(), false));
stream.reply(new ReplyInfo(true));
// wait until stream is closed
streamClosedLatch.await(5, TimeUnit.SECONDS);
@ -391,7 +397,7 @@ public class PushStreamTest extends AbstractTest
try
{
stream.reply(new ReplyInfo(false), new Callback.Adapter());
pushStream = stream.syn(new SynInfo(new Fields(), false));
pushStream = stream.push(new PushInfo(new Fields(), false));
resetReceivedLatch.await(5, TimeUnit.SECONDS);
}
catch (InterruptedException | ExecutionException | TimeoutException e)
@ -400,8 +406,15 @@ public class PushStreamTest extends AbstractTest
unexpectedExceptionOccurred.set(true);
}
assert pushStream != null;
pushStream.data(new BytesDataInfo(transferBytes, true), new Callback.Adapter());
stream.data(new StringDataInfo("close", true), new Callback.Adapter());
try
{
pushStream.data(new BytesDataInfo(transferBytes, true));
stream.data(new StringDataInfo("close", true));
}
catch (InterruptedException | ExecutionException | TimeoutException e)
{
LOG.debug(e.getMessage());
}
}
}).start();
return null;
@ -527,7 +540,7 @@ public class PushStreamTest extends AbstractTest
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
stream.syn(new SynInfo(new Fields(), false), new Promise.Adapter<Stream>());
stream.push(new PushInfo(new Fields(), false), new Promise.Adapter<Stream>());
return null;
}
}), new SessionFrameListener.Adapter()

View File

@ -183,7 +183,7 @@ public class ResetStreamTest extends AbstractTest
});
Stream stream = session.syn(new SynInfo(5, TimeUnit.SECONDS, new Fields(), false, (byte)0), null);
assertThat("syn is received by server", synLatch.await(5, TimeUnit.SECONDS), is(true));
assertThat("push is received by server", synLatch.await(5, TimeUnit.SECONDS), is(true));
stream.data(new StringDataInfo(5, TimeUnit.SECONDS, "data", false), new Callback.Adapter());
assertThat("stream is reset", rstLatch.await(5, TimeUnit.SECONDS), is(true));
stream.data(new StringDataInfo(5, TimeUnit.SECONDS, "2nd dataframe", false), new Callback.Adapter()