402277 spdy proxy: fix race condition in nested push streams initiated by upstream server. Fix several other small proxy issues

This commit is contained in:
Thomas Becker 2013-03-04 10:24:58 +01:00
parent c396622770
commit 1aa8fce78b
7 changed files with 355 additions and 158 deletions

View File

@ -1350,6 +1350,7 @@ public class StandardSession implements ISession, Parser.Listener, Dumpable
{
bufferPool.release(buffer);
IStream stream = getStream();
dataInfo.consume(size);
flowControlStrategy.updateWindow(StandardSession.this, stream, -size);
if (dataInfo.available() > 0)
{

View File

@ -403,7 +403,7 @@ public class StandardStream implements IStream
if (isLocallyClosed())
{
session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a closed stream");
throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a locally closed stream");
}
// Cannot update the close state here, because the data that we send may
@ -481,7 +481,7 @@ public class StandardStream implements IStream
@Override
public String toString()
{
return String.format("stream=%d v%d windowSize=%db reset=%s prio=%d %s %s", getId(), session.getVersion(),
return String.format("stream=%d v%d windowSize=%d reset=%s prio=%d %s %s", getId(), session.getVersion(),
getWindowSize(), isReset(), priority, openState, closeState);
}

View File

@ -97,13 +97,13 @@ public interface Stream
/**
* <p>Initiate a unidirectional spdy pushstream associated to this stream asynchronously<p> <p>Callers may pass a
* non-null completion callback to be notified of when the pushstream has been established.</p>
* non-null completion promise to be notified of when the pushstream has been established.</p>
*
* @param pushInfo the metadata to send on stream creation
* @param callback the completion callback that gets notified once the pushstream is established
* @param promise the completion promise that gets notified once the pushstream is established
* @see #push(PushInfo)
*/
public void push(PushInfo pushInfo, Promise<Stream> callback);
public void push(PushInfo pushInfo, Promise<Stream> promise);
/**
* <p>Sends asynchronously a SYN_REPLY frame in response to a SYN_STREAM frame.</p> <p>Callers may use the returned

View File

@ -58,7 +58,7 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
{
private static final Logger LOG = Log.getLogger(SPDYProxyEngine.class);
private static final String STREAM_HANDLER_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.streamHandler";
private static final String STREAM_PROMISE_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.streamPromise";
private static final String CLIENT_STREAM_ATTRIBUTE = "org.eclipse.jetty.spdy.server.http.proxy.clientStream";
private final ConcurrentMap<String, Session> serverSessions = new ConcurrentHashMap<>();
@ -113,9 +113,9 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
SynInfo serverSynInfo = new SynInfo(headers, clientSynInfo.isClose());
StreamFrameListener listener = new ProxyStreamFrameListener(clientStream);
StreamHandler handler = new StreamHandler(clientStream, serverSynInfo);
clientStream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
serverSession.syn(serverSynInfo, listener, handler);
StreamPromise promise = new StreamPromise(clientStream, serverSynInfo);
clientStream.setAttribute(STREAM_PROMISE_ATTRIBUTE, promise);
serverSession.syn(serverSynInfo, listener, promise);
return this;
}
@ -166,8 +166,8 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
}
};
StreamHandler streamHandler = (StreamHandler)clientStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
streamHandler.data(serverDataInfo);
StreamPromise streamPromise = (StreamPromise)clientStream.getAttribute(STREAM_PROMISE_ATTRIBUTE);
streamPromise.data(serverDataInfo);
}
private Session produceSession(String host, short version, InetSocketAddress address)
@ -219,87 +219,101 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
stream.getSession().rst(rstInfo, new Callback.Adapter());
}
private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
private class ProxyPushStreamFrameListener implements StreamFrameListener
{
private final Stream clientStream;
private volatile ReplyInfo replyInfo;
private PushStreamPromise pushStreamPromise;
public ProxyStreamFrameListener(Stream clientStream)
private ProxyPushStreamFrameListener(PushStreamPromise pushStreamPromise)
{
this.clientStream = clientStream;
this.pushStreamPromise = pushStreamPromise;
}
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
LOG.debug("S -> P pushed {} on {}", pushInfo, stream);
LOG.debug("S -> P pushed {} on {}. Opening new PushStream P -> C now.", pushInfo, stream);
PushStreamPromise newPushStreamPromise = new PushStreamPromise(stream, pushInfo);
this.pushStreamPromise.push(newPushStreamPromise);
return new ProxyPushStreamFrameListener(newPushStreamPromise);
}
Fields headers = new Fields(pushInfo.getHeaders(), false);
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
// Push streams never send a reply
throw new UnsupportedOperationException();
}
addResponseProxyHeaders(stream, headers);
customizeResponseHeaders(stream, headers);
Stream clientStream = (Stream)stream.getAssociatedStream().getAttribute
(CLIENT_STREAM_ATTRIBUTE);
convert(stream.getSession().getVersion(), clientStream.getSession().getVersion(),
headers);
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
throw new UnsupportedOperationException();
}
StreamHandler handler = new StreamHandler(clientStream, pushInfo);
stream.setAttribute(STREAM_HANDLER_ATTRIBUTE, handler);
clientStream.push(new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers,
pushInfo.isClose()),
handler);
return new Adapter()
@Override
public void onData(Stream serverStream, final DataInfo serverDataInfo)
{
LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
public void consume(int delta)
{
// Push streams never send a reply
throw new UnsupportedOperationException();
}
@Override
public void onHeaders(Stream stream, HeadersInfo headersInfo)
{
throw new UnsupportedOperationException();
}
@Override
public void onData(Stream serverStream, final DataInfo serverDataInfo)
{
LOG.debug("S -> P pushed {} on {}", serverDataInfo, serverStream);
ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
{
@Override
public void consume(int delta)
{
super.consume(delta);
serverDataInfo.consume(delta);
}
};
StreamHandler handler = (StreamHandler)serverStream.getAttribute(STREAM_HANDLER_ATTRIBUTE);
handler.data(clientDataInfo);
super.consume(delta);
serverDataInfo.consume(delta);
}
};
pushStreamPromise.data(clientDataInfo);
}
}
private class ProxyStreamFrameListener extends StreamFrameListener.Adapter
{
private final Stream receiverStream;
public ProxyStreamFrameListener(Stream receiverStream)
{
this.receiverStream = receiverStream;
}
@Override
public StreamFrameListener onPush(Stream senderStream, PushInfo pushInfo)
{
LOG.debug("S -> P {} on {}");
PushInfo newPushInfo = convertPushInfo(pushInfo, senderStream, receiverStream);
PushStreamPromise pushStreamPromise = new PushStreamPromise(senderStream, newPushInfo);
receiverStream.push(newPushInfo, pushStreamPromise);
return new ProxyPushStreamFrameListener(pushStreamPromise);
}
@Override
public void onReply(final Stream stream, ReplyInfo replyInfo)
{
LOG.debug("S -> P {} on {}", replyInfo, stream);
final ReplyInfo clientReplyInfo = new ReplyInfo(convertHeaders(stream, receiverStream, replyInfo.getHeaders()),
replyInfo.isClose());
reply(stream, clientReplyInfo);
}
short serverVersion = stream.getSession().getVersion();
Fields headers = new Fields(replyInfo.getHeaders(), false);
private void reply(final Stream stream, final ReplyInfo clientReplyInfo)
{
receiverStream.reply(clientReplyInfo, new Callback()
{
@Override
public void succeeded()
{
LOG.debug("P -> C {} from {} to {}", clientReplyInfo, stream, receiverStream);
}
addResponseProxyHeaders(stream, headers);
customizeResponseHeaders(stream, headers);
short clientVersion = this.clientStream.getSession().getVersion();
convert(serverVersion, clientVersion, headers);
this.replyInfo = new ReplyInfo(headers, replyInfo.isClose());
if (replyInfo.isClose())
reply(stream);
@Override
public void failed(Throwable x)
{
LOG.debug(x);
rst(receiverStream);
}
});
}
@Override
@ -313,101 +327,82 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
public void onData(final Stream stream, final DataInfo dataInfo)
{
LOG.debug("S -> P {} on {}", dataInfo, stream);
if (replyInfo != null)
{
if (dataInfo.isClose())
replyInfo.getHeaders().put("content-length", String.valueOf(dataInfo.available()));
reply(stream);
}
data(stream, dataInfo);
}
private void reply(final Stream stream)
private void data(final Stream stream, final DataInfo serverDataInfo)
{
final ReplyInfo replyInfo = this.replyInfo;
this.replyInfo = null;
clientStream.reply(replyInfo, new Callback()
final ByteBufferDataInfo clientDataInfo = new ByteBufferDataInfo(serverDataInfo.asByteBuffer(false), serverDataInfo.isClose())
{
@Override
public void consume(int delta)
{
super.consume(delta);
serverDataInfo.consume(delta);
}
};
receiverStream.data(clientDataInfo, new Callback() //TODO: timeout???
{
@Override
public void succeeded()
{
LOG.debug("P -> C {} from {} to {}", replyInfo, stream, clientStream);
LOG.debug("P -> C {} from {} to {}", clientDataInfo, stream, receiverStream);
}
@Override
public void failed(Throwable x)
{
LOG.debug(x);
rst(clientStream);
}
});
}
private void data(final Stream stream, final DataInfo dataInfo)
{
clientStream.data(dataInfo, new Callback() //TODO: timeout???
{
@Override
public void succeeded()
{
dataInfo.consume(dataInfo.length());
LOG.debug("P -> C {} from {} to {}", dataInfo, stream, clientStream);
}
@Override
public void failed(Throwable x)
{
LOG.debug(x);
rst(clientStream);
rst(receiverStream);
}
});
}
}
/**
* <p>{@link StreamHandler} implements the forwarding of DATA frames from the client to the server.</p> <p>Instances
* of this class buffer DATA frames sent by clients and send them to the server. The buffering happens between the
* send of the SYN_STREAM to the server (where DATA frames may arrive from the client before the SYN_STREAM has been
* fully sent), and between DATA frames, if the client is a fast producer and the server a slow consumer, or if the
* client is a SPDY v2 client (and hence without flow control) while the server is a SPDY v3 server (and hence with
* flow control).</p>
* <p>{@link StreamPromise} implements the forwarding of DATA frames from the client to the server and vice
* versa.</p> <p>Instances of this class buffer DATA frames sent by clients and send them to the server. The
* buffering happens between the send of the SYN_STREAM to the server (where DATA frames may arrive from the client
* before the SYN_STREAM has been fully sent), and between DATA frames, if the client is a fast producer and the
* server a slow consumer, or if the client is a SPDY v2 client (and hence without flow control) while the server is
* a SPDY v3 server (and hence with flow control).</p>
*/
private class StreamHandler implements Promise<Stream>
private class StreamPromise implements Promise<Stream>
{
private final Queue<DataInfoHandler> queue = new LinkedList<>();
private final Stream clientStream;
private final Queue<DataInfoCallback> queue = new LinkedList<>();
private final Stream senderStream;
private final Info info;
private Stream serverStream;
private Stream receiverStream;
private StreamHandler(Stream clientStream, Info info)
private StreamPromise(Stream senderStream, Info info)
{
this.clientStream = clientStream;
this.senderStream = senderStream;
this.info = info;
}
@Override
public void succeeded(Stream serverStream)
public void succeeded(Stream stream)
{
LOG.debug("P -> S {} from {} to {}", info, clientStream, serverStream);
LOG.debug("P -> S {} from {} to {}", info, senderStream, stream);
serverStream.setAttribute(CLIENT_STREAM_ATTRIBUTE, clientStream);
stream.setAttribute(CLIENT_STREAM_ATTRIBUTE, senderStream);
DataInfoHandler dataInfoHandler;
DataInfoCallback dataInfoCallback;
synchronized (queue)
{
this.serverStream = serverStream;
dataInfoHandler = queue.peek();
if (dataInfoHandler != null)
this.receiverStream = stream;
dataInfoCallback = queue.peek();
if (dataInfoCallback != null)
{
if (dataInfoHandler.flushing)
if (dataInfoCallback.flushing)
{
LOG.debug("SYN completed, flushing {}, queue size {}", dataInfoHandler.dataInfo, queue.size());
dataInfoHandler = null;
LOG.debug("SYN completed, flushing {}, queue size {}", dataInfoCallback.dataInfo, queue.size());
dataInfoCallback = null;
}
else
{
dataInfoHandler.flushing = true;
dataInfoCallback.flushing = true;
LOG.debug("SYN completed, queue size {}", queue.size());
}
}
@ -416,37 +411,37 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
LOG.debug("SYN completed, queue empty");
}
}
if (dataInfoHandler != null)
flush(serverStream, dataInfoHandler);
if (dataInfoCallback != null)
flush(stream, dataInfoCallback);
}
@Override
public void failed(Throwable x)
{
LOG.debug(x);
rst(clientStream);
rst(senderStream);
}
public void data(DataInfo dataInfo)
{
Stream serverStream;
DataInfoHandler dataInfoHandler = null;
DataInfoHandler item = new DataInfoHandler(dataInfo);
Stream receiverStream;
DataInfoCallback dataInfoCallbackToFlush = null;
DataInfoCallback dataInfoCallBackToQueue = new DataInfoCallback(dataInfo);
synchronized (queue)
{
queue.offer(item);
serverStream = this.serverStream;
if (serverStream != null)
queue.offer(dataInfoCallBackToQueue);
receiverStream = this.receiverStream;
if (receiverStream != null)
{
dataInfoHandler = queue.peek();
if (dataInfoHandler.flushing)
dataInfoCallbackToFlush = queue.peek();
if (dataInfoCallbackToFlush.flushing)
{
LOG.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoHandler.dataInfo, queue.size());
serverStream = null;
LOG.debug("Queued {}, flushing {}, queue size {}", dataInfo, dataInfoCallbackToFlush.dataInfo, queue.size());
receiverStream = null;
}
else
{
dataInfoHandler.flushing = true;
dataInfoCallbackToFlush.flushing = true;
LOG.debug("Queued {}, queue size {}", dataInfo, queue.size());
}
}
@ -455,22 +450,22 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
LOG.debug("Queued {}, SYN incomplete, queue size {}", dataInfo, queue.size());
}
}
if (serverStream != null)
flush(serverStream, dataInfoHandler);
if (receiverStream != null)
flush(receiverStream, dataInfoCallbackToFlush);
}
private void flush(Stream serverStream, DataInfoHandler dataInfoHandler)
private void flush(Stream receiverStream, DataInfoCallback dataInfoCallback)
{
LOG.debug("P -> S {} on {}", dataInfoHandler.dataInfo, serverStream);
serverStream.data(dataInfoHandler.dataInfo, dataInfoHandler); //TODO: timeout???
LOG.debug("P -> S {} on {}", dataInfoCallback.dataInfo, receiverStream);
receiverStream.data(dataInfoCallback.dataInfo, dataInfoCallback); //TODO: timeout???
}
private class DataInfoHandler implements Callback
private class DataInfoCallback implements Callback
{
private final DataInfo dataInfo;
private boolean flushing;
private DataInfoHandler(DataInfo dataInfo)
private DataInfoCallback(DataInfo dataInfo)
{
this.dataInfo = dataInfo;
}
@ -479,18 +474,18 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
public void succeeded()
{
Stream serverStream;
DataInfoHandler dataInfoHandler;
DataInfoCallback dataInfoCallback;
synchronized (queue)
{
serverStream = StreamHandler.this.serverStream;
serverStream = StreamPromise.this.receiverStream;
assert serverStream != null;
dataInfoHandler = queue.poll();
assert dataInfoHandler == this;
dataInfoHandler = queue.peek();
if (dataInfoHandler != null)
dataInfoCallback = queue.poll();
assert dataInfoCallback == this;
dataInfoCallback = queue.peek();
if (dataInfoCallback != null)
{
assert !dataInfoHandler.flushing;
dataInfoHandler.flushing = true;
assert !dataInfoCallback.flushing;
dataInfoCallback.flushing = true;
LOG.debug("Completed {}, queue size {}", dataInfo, queue.size());
}
else
@ -498,22 +493,71 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
LOG.debug("Completed {}, queue empty", dataInfo);
}
}
if (dataInfoHandler != null)
flush(serverStream, dataInfoHandler);
if (dataInfoCallback != null)
flush(serverStream, dataInfoCallback);
}
@Override
public void failed(Throwable x)
{
LOG.debug(x);
rst(clientStream);
rst(senderStream);
}
}
public Stream getSenderStream()
{
return senderStream;
}
public Info getInfo()
{
return info;
}
public Stream getReceiverStream()
{
synchronized (queue)
{
return receiverStream;
}
}
}
private class PushStreamPromise extends StreamPromise
{
private volatile PushStreamPromise pushStreamPromise;
private PushStreamPromise(Stream senderStream, PushInfo pushInfo)
{
super(senderStream, pushInfo);
}
@Override
public void succeeded(Stream receiverStream)
{
super.succeeded(receiverStream);
LOG.debug("P -> C PushStreamPromise.succeeded() called with pushStreamPromise: {}", pushStreamPromise);
PushStreamPromise promise = pushStreamPromise;
if (promise != null)
receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
}
public void push(PushStreamPromise pushStreamPromise)
{
Stream receiverStream = getReceiverStream();
if (receiverStream != null)
receiverStream.push(convertPushInfo((PushInfo)getInfo(), getSenderStream(), receiverStream), pushStreamPromise);
else
this.pushStreamPromise = pushStreamPromise;
}
}
private class ProxySessionFrameListener extends SessionFrameListener.Adapter
{
@Override
public void onRst(Session serverSession, RstInfo serverRstInfo)
{
@ -536,4 +580,20 @@ public class SPDYProxyEngine extends ProxyEngine implements StreamFrameListener
serverSessions.values().remove(serverSession);
}
}
private PushInfo convertPushInfo(PushInfo pushInfo, Stream from, Stream to)
{
Fields headersToConvert = pushInfo.getHeaders();
Fields headers = convertHeaders(from, to, headersToConvert);
return new PushInfo(getTimeout(), TimeUnit.MILLISECONDS, headers, pushInfo.isClose());
}
private Fields convertHeaders(Stream from, Stream to, Fields headersToConvert)
{
Fields headers = new Fields(headersToConvert, false);
addResponseProxyHeaders(from, headers);
customizeResponseHeaders(from, headers);
convert(from.getSession().getVersion(), to.getSession().getVersion(), headers);
return headers;
}
}

View File

@ -566,7 +566,6 @@ public class ReferrerPushStrategyTest extends AbstractHTTPSPDYTest
final CountDownLatch mainStreamLatch = new CountDownLatch(2);
final CountDownLatch pushDataLatch = new CountDownLatch(2);
Session session2 = startClient(version, address, null);
LOG.warn("REQUEST FOR PUSHED RESOURCES");
session2.syn(new SynInfo(mainRequestHeaders, true), new StreamFrameListener.Adapter()
{
@Override

View File

@ -311,6 +311,8 @@ public class ProxyHTTPToSPDYTest
Fields responseHeaders = new Fields();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200 OK");
responseHeaders.put("content-length", String.valueOf(data.length));
ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
stream.reply(replyInfo, new Callback.Adapter());
stream.data(new BytesDataInfo(data, true), new Callback.Adapter());
@ -437,6 +439,7 @@ public class ProxyHTTPToSPDYTest
Fields responseHeaders = new Fields();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200 OK");
responseHeaders.put("content-length", String.valueOf(data.length));
ReplyInfo replyInfo = new ReplyInfo(responseHeaders, false);
stream.reply(replyInfo, new Callback.Adapter());
stream.data(new BytesDataInfo(data, true), new Callback.Adapter());

View File

@ -328,6 +328,140 @@ public class ProxySPDYToSPDYTest
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
}
@Test
public void testSYNThenSPDYNestedPushIsReceived() throws Exception
{
final byte[] data = "0123456789ABCDEF".getBytes("UTF-8");
InetSocketAddress proxyAddress = startProxy(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
Fields responseHeaders = new Fields();
responseHeaders.put(HTTPSPDYHeader.VERSION.name(version), "HTTP/1.1");
responseHeaders.put(HTTPSPDYHeader.STATUS.name(version), "200 OK");
stream.reply(new ReplyInfo(responseHeaders, false), new Callback.Adapter());
final Fields pushHeaders = new Fields();
pushHeaders.put(HTTPSPDYHeader.URI.name(version), "/push");
stream.push(new PushInfo(5, TimeUnit.SECONDS, pushHeaders, false), new Promise.Adapter<Stream>()
{
@Override
public void succeeded(Stream pushStream)
{
pushHeaders.put(HTTPSPDYHeader.URI.name(version), "/nestedpush");
pushStream.push(new PushInfo(5, TimeUnit.SECONDS, pushHeaders, false), new Adapter<Stream>()
{
@Override
public void succeeded(Stream pushStream)
{
pushHeaders.put(HTTPSPDYHeader.URI.name(version), "/anothernestedpush");
pushStream.push(new PushInfo(5, TimeUnit.SECONDS, pushHeaders, false), new Adapter<Stream>()
{
@Override
public void succeeded(Stream pushStream)
{
pushStream.data(new BytesDataInfo(data, true), new Callback.Adapter());
}
});
pushStream.data(new BytesDataInfo(data, true), new Callback.Adapter());
}
});
pushStream.data(new BytesDataInfo(data, true), new Callback.Adapter());
}
});
stream.data(new BytesDataInfo(data, true), new Callback.Adapter());
return null;
}
}));
proxyConnector.addConnectionFactory(proxyConnector.getConnectionFactory("spdy/" + version));
final CountDownLatch pushSynLatch = new CountDownLatch(3);
final CountDownLatch pushDataLatch = new CountDownLatch(3);
Session client = factory.newSPDYClient(version).connect(proxyAddress, null).get(5, TimeUnit.SECONDS);
Fields headers = new Fields();
headers.put(HTTPSPDYHeader.HOST.name(version), "localhost:" + proxyAddress.getPort());
final CountDownLatch replyLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
client.syn(new SynInfo(headers, true), new StreamFrameListener.Adapter()
{
// onPush for 1st push stream
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
pushSynLatch.countDown();
return new StreamFrameListener.Adapter()
{
// onPush for 2nd nested push stream
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
pushSynLatch.countDown();
return new Adapter()
{
// onPush for 3rd nested push stream
@Override
public StreamFrameListener onPush(Stream stream, PushInfo pushInfo)
{
pushSynLatch.countDown();
return new Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
pushDataLatch.countDown();
}
};
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
pushDataLatch.countDown();
}
};
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
pushDataLatch.countDown();
}
};
}
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
replyLatch.countDown();
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataInfo.consume(dataInfo.length());
if (dataInfo.isClose())
dataLatch.countDown();
}
});
Assert.assertTrue(replyLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(pushSynLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(pushDataLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
client.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
}
@Test
public void testPING() throws Exception
{