481718 - Improve stream interleaving.

Introduced the constraint in the API that Stream.headers() and
Stream.data() calls cannot be invoked if the previous operation did
not complete.

Improved interleaving by appending unfinished DATA frames at the end
of the queue, rather than prepending them.
This commit is contained in:
Simone Bordet 2015-11-09 21:58:48 +01:00
parent b800ffc983
commit 279e56d336
8 changed files with 336 additions and 79 deletions

View File

@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
@ -64,6 +65,7 @@ import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@ -293,11 +295,16 @@ public abstract class FlowControlStrategyTest
Stream stream = promise.get(5, TimeUnit.SECONDS);
// Send first chunk that exceeds the window.
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), false), Callback.NOOP);
Callback.Completable completable = new Callback.Completable();
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), false), completable);
settingsLatch.await(5, TimeUnit.SECONDS);
// Send the second chunk of data, must not arrive since we're flow control stalled on the client.
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), true), Callback.NOOP);
completable.thenRun(() ->
{
// Send the second chunk of data, must not arrive since we're flow control stalled on the client.
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(size * 2), true), Callback.NOOP);
});
Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
// Consume the data arrived to server, this will resume flow control on the client.
@ -325,10 +332,13 @@ public abstract class FlowControlStrategyTest
{
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
stream.headers(responseFrame, Callback.NOOP);
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(length), true);
stream.data(dataFrame, Callback.NOOP);
CompletableFuture<Void> completable = new CompletableFuture<>();
stream.headers(responseFrame, Callback.from(completable));
completable.thenRun(() ->
{
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(length), true);
stream.data(dataFrame, Callback.NOOP);
});
return null;
}
});
@ -416,7 +426,7 @@ public abstract class FlowControlStrategyTest
public Stream.Listener onNewStream(Stream stream, HeadersFrame requestFrame)
{
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, true);
stream.headers(responseFrame, Callback.NOOP);
return new Stream.Listener.Adapter()
{
@ -527,9 +537,13 @@ public abstract class FlowControlStrategyTest
// For every stream, send down half the window size of data.
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
stream.headers(responseFrame, Callback.NOOP);
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(windowSize / 2), true);
stream.data(dataFrame, Callback.NOOP);
Callback.Completable completable = new Callback.Completable();
stream.headers(responseFrame, completable);
completable.thenRun(() ->
{
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(windowSize / 2), true);
stream.data(dataFrame, Callback.NOOP);
});
return null;
}
}
@ -615,9 +629,13 @@ public abstract class FlowControlStrategyTest
{
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
stream.headers(responseFrame, Callback.NOOP);
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.wrap(data), true);
stream.data(dataFrame, Callback.NOOP);
Callback.Completable completable = new Callback.Completable();
stream.headers(responseFrame, completable);
completable.thenRun(() ->
{
DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.wrap(data), true);
stream.data(dataFrame, Callback.NOOP);
});
return null;
}
});
@ -647,6 +665,11 @@ public abstract class FlowControlStrategyTest
Assert.assertArrayEquals(data, bytes);
}
// TODO
// Since we changed the API to disallow consecutive data() calls without waiting
// for the callback, it is now not possible to have DATA1, DATA2 in the queue for
// the same stream. Perhaps this test should just be deleted.
@Ignore
@Test
public void testServerTwoDataFramesWithStalledStream() throws Exception
{
@ -734,7 +757,8 @@ public abstract class FlowControlStrategyTest
{
MetaData metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), metaData, null, false);
stream.headers(responseFrame, Callback.NOOP);
Callback.Completable completable = new Callback.Completable();
stream.headers(responseFrame, completable);
return new Stream.Listener.Adapter()
{
@Override
@ -745,7 +769,8 @@ public abstract class FlowControlStrategyTest
ByteBuffer data = frame.getData();
ByteBuffer copy = ByteBuffer.allocateDirect(data.remaining());
copy.put(data).flip();
stream.data(new DataFrame(stream.getId(), copy, frame.isEndStream()), callback);
completable.thenRun(() ->
stream.data(new DataFrame(stream.getId(), copy, frame.isEndStream()), callback));
}
};
}
@ -770,9 +795,9 @@ public abstract class FlowControlStrategyTest
final ByteBuffer responseContent = ByteBuffer.wrap(responseData);
MetaData.Request metaData = newRequest("GET", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(metaData, null, false);
FuturePromise<Stream> streamPromise = new FuturePromise<>();
Promise.Completable<Stream> completable = new Promise.Completable<>();
final CountDownLatch latch = new CountDownLatch(1);
session.newStream(requestFrame, streamPromise, new Stream.Listener.Adapter()
session.newStream(requestFrame, completable, new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
@ -783,11 +808,12 @@ public abstract class FlowControlStrategyTest
latch.countDown();
}
});
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
ByteBuffer requestContent = ByteBuffer.wrap(requestData);
DataFrame dataFrame = new DataFrame(stream.getId(), requestContent, true);
stream.data(dataFrame, Callback.NOOP);
completable.thenAccept(stream ->
{
ByteBuffer requestContent = ByteBuffer.wrap(requestData);
DataFrame dataFrame = new DataFrame(stream.getId(), requestContent, true);
stream.data(dataFrame, Callback.NOOP);
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
@ -815,9 +841,9 @@ public abstract class FlowControlStrategyTest
// Consume the whole session and stream window.
MetaData.Request metaData = newRequest("POST", new HttpFields());
HeadersFrame requestFrame = new HeadersFrame(metaData, null, false);
FuturePromise<Stream> streamPromise = new FuturePromise<>();
session.newStream(requestFrame, streamPromise, new Stream.Listener.Adapter());
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
CompletableFuture<Stream> completable = new CompletableFuture<>();
session.newStream(requestFrame, Promise.from(completable), new Stream.Listener.Adapter());
Stream stream = completable.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
final CountDownLatch dataLatch = new CountDownLatch(1);
stream.data(new DataFrame(stream.getId(), data, false), new Callback.NonBlocking()

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.http2.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
@ -33,6 +34,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HostPortHttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.api.Session;
@ -41,6 +43,7 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.Promise;
@ -257,7 +260,7 @@ public class HTTP2Test extends AbstractTest
}
});
Thread.sleep(1000);
sleep(1000);
server.stop();
@ -279,10 +282,191 @@ public class HTTP2Test extends AbstractTest
newClient(new Session.Listener.Adapter());
Thread.sleep(1000);
sleep(1000);
client.stop();
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testInvalidAPIUsageOnClient() throws Exception
{
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
Callback.Completable completable = new Callback.Completable();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, false), completable);
return new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
if (frame.isEndStream())
{
completable.thenRun(() ->
{
DataFrame endFrame = new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true);
stream.data(endFrame, Callback.NOOP);
});
}
}
};
}
});
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("GET", new HttpFields());
HeadersFrame frame = new HeadersFrame(metaData, null, false);
Promise.Completable<Stream> completable = new Promise.Completable<>();
CountDownLatch completeLatch = new CountDownLatch(2);
session.newStream(frame, completable, new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
if (frame.isEndStream())
completeLatch.countDown();
}
});
Stream stream = completable.get(5, TimeUnit.SECONDS);
long sleep = 1000;
DataFrame data1 = new DataFrame(stream.getId(), ByteBuffer.allocate(1024), false)
{
@Override
public ByteBuffer getData()
{
sleep(2 * sleep);
return super.getData();
}
};
DataFrame data2 = new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true);
new Thread(() ->
{
// The first data() call is legal, but slow.
stream.data(data1, new Callback()
{
@Override
public void succeeded()
{
stream.data(data2, NOOP);
}
});
}).start();
// Wait for the first data() call to happen.
sleep(sleep);
// This data call is illegal because it does not
// wait for the previous callback to complete.
stream.data(data2, new Callback()
{
@Override
public void failed(Throwable x)
{
if (x instanceof WritePendingException)
{
// Expected.
completeLatch.countDown();
}
}
});
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
}
private static void sleep(long time)
{
try
{
Thread.sleep(time);
}
catch (InterruptedException x)
{
throw new RuntimeException();
}
}
}
/*
@Test
public void testInvalidAPIUsageOnServer() throws Exception
{
long sleep = 1000;
CountDownLatch completeLatch = new CountDownLatch(2);
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
DataFrame dataFrame = new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true);
// The call to headers() is legal, but slow.
new Thread(() ->
{
stream.headers(new HeadersFrame(stream.getId(), response, null, false)
{
@Override
public MetaData getMetaData()
{
sleep(2 * sleep);
return super.getMetaData();
}
}, new Callback()
{
@Override
public void succeeded()
{
stream.data(dataFrame, NOOP);
}
});
}).start();
// Wait for the headers() call to happen.
sleep(sleep);
// This data call is illegal because it does not
// wait for the previous callback to complete.
stream.data(dataFrame, new Callback()
{
@Override
public void failed(Throwable x)
{
if (x instanceof WritePendingException)
{
// Expected.
completeLatch.countDown();
}
}
});
return null;
}
});
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("GET", new HttpFields());
HeadersFrame frame = new HeadersFrame(metaData, null, true);
session.newStream(frame, new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
if (frame.isEndStream())
completeLatch.countDown();
}
});
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
}
*/

View File

@ -512,12 +512,20 @@ public class IdleTimeoutTest extends AbstractTest
session.newStream(requestFrame, promise, new Stream.Listener.Adapter());
final Stream stream = promise.get(5, TimeUnit.SECONDS);
Callback.Completable completable1 = new Callback.Completable();
sleep(idleTimeout / 2);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), Callback.NOOP);
sleep(idleTimeout / 2);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), Callback.NOOP);
sleep(idleTimeout / 2);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), true), Callback.NOOP);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), completable1);
completable1.thenCompose(nil ->
{
Callback.Completable completable2 = new Callback.Completable();
sleep(idleTimeout / 2);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), false), completable2);
return completable2;
}).thenRun(() ->
{
sleep(idleTimeout / 2);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1), true), Callback.NOOP);
});
Assert.assertFalse(resetLatch.await(0, TimeUnit.SECONDS));
}

View File

@ -122,24 +122,26 @@ public class StreamCloseTest extends AbstractTest
{
MetaData.Response metaData = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
HeadersFrame response = new HeadersFrame(stream.getId(), metaData, null, false);
stream.headers(response, Callback.NOOP);
Callback.Completable completable = new Callback.Completable();
stream.headers(response, completable);
return new Stream.Listener.Adapter()
{
@Override
public void onData(final Stream stream, DataFrame frame, final Callback callback)
{
Assert.assertTrue(((HTTP2Stream)stream).isRemotelyClosed());
stream.data(frame, new Callback()
{
@Override
public void succeeded()
{
Assert.assertTrue(stream.isClosed());
Assert.assertEquals(0, stream.getSession().getStreams().size());
callback.succeeded();
serverDataLatch.countDown();
}
});
completable.thenRun(() ->
stream.data(frame, new Callback()
{
@Override
public void succeeded()
{
Assert.assertTrue(stream.isClosed());
Assert.assertEquals(0, stream.getSession().getStreams().size());
callback.succeeded();
serverDataLatch.countDown();
}
}));
}
};
}

View File

@ -37,6 +37,7 @@ import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
@ -126,29 +127,38 @@ public class StreamResetTest extends AbstractTest
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, 200, new HttpFields());
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, false);
stream.headers(responseFrame, Callback.NOOP);
Callback.Completable completable = new Callback.Completable();
stream.headers(responseFrame, completable);
return new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callback.succeeded();
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), Callback.NOOP);
serverDataLatch.countDown();
completable.thenRun(() ->
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), new Callback()
{
@Override
public void succeeded()
{
serverDataLatch.countDown();
}
}));
}
@Override
public void onReset(Stream stream, ResetFrame frame)
public void onReset(Stream s, ResetFrame frame)
{
// Simulate that there is pending data to send.
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(16), true), new Callback()
IStream stream = (IStream)s;
stream.getSession().frames(stream, new Callback()
{
@Override
public void failed(Throwable x)
{
serverResetLatch.countDown();
}
});
}, new DataFrame(s.getId(), ByteBuffer.allocate(16), true));
}
};
}

View File

@ -1164,9 +1164,10 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (dataFrame.remaining() > 0)
{
// We have written part of the frame, but there is more to write.
// We need to keep the correct ordering of frames, to avoid that other
// frames for the same stream are written before this one is finished.
flusher.prepend(this);
// The API will not allow to send two data frames for the same
// stream so we append the unfinished frame at the end to allow
// better interleaving with other streams.
flusher.append(this);
}
else
{

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.http2;
import java.io.EOFException;
import java.io.IOException;
import java.nio.channels.WritePendingException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
@ -39,12 +40,13 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
public class HTTP2Stream extends IdleTimeout implements IStream
public class HTTP2Stream extends IdleTimeout implements IStream, Callback
{
private static final Logger LOG = Log.getLogger(HTTP2Stream.class);
private final AtomicReference<ConcurrentMap<String, Object>> attributes = new AtomicReference<>();
private final AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED);
private final AtomicReference<Callback> writing = new AtomicReference<>();
private final AtomicInteger sendWindow = new AtomicInteger();
private final AtomicInteger recvWindow = new AtomicInteger();
private final ISession session;
@ -75,8 +77,10 @@ public class HTTP2Stream extends IdleTimeout implements IStream
@Override
public void headers(HeadersFrame frame, Callback callback)
{
if (!checkWrite(callback))
return;
notIdle();
session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
session.frames(this, this, frame, Frame.EMPTY_ARRAY);
}
@Override
@ -89,8 +93,10 @@ public class HTTP2Stream extends IdleTimeout implements IStream
@Override
public void data(DataFrame frame, Callback callback)
{
if (!checkWrite(callback))
return;
notIdle();
session.data(this, callback, frame);
session.data(this, this, frame);
}
@Override
@ -103,6 +109,14 @@ public class HTTP2Stream extends IdleTimeout implements IStream
session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
}
private boolean checkWrite(Callback callback)
{
if (writing.compareAndSet(null, callback))
return true;
callback.failed(new WritePendingException());
return false;
}
@Override
public Object getAttribute(String key)
{
@ -352,6 +366,20 @@ public class HTTP2Stream extends IdleTimeout implements IStream
onClose();
}
@Override
public void succeeded()
{
Callback callback = writing.getAndSet(null);
callback.succeeded();
}
@Override
public void failed(Throwable x)
{
Callback callback = writing.getAndSet(null);
callback.failed(x);
}
private void notifyData(Stream stream, DataFrame frame, Callback callback)
{
final Listener listener = this.listener;

View File

@ -44,7 +44,6 @@ public class HttpTransportOverHTTP2 implements HttpTransport
private static final Logger LOG = Log.getLogger(HttpTransportOverHTTP2.class);
private final AtomicBoolean commit = new AtomicBoolean();
private final Callback commitCallback = new CommitCallback();
private final Connector connector;
private final HTTP2ServerConnection connection;
private IStream stream;
@ -62,7 +61,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
// copying we can defer to the endpoint
return connection.getEndPoint().isOptimizedForDirectBuffers();
}
public IStream getStream()
{
return stream;
@ -101,8 +100,24 @@ public class HttpTransportOverHTTP2 implements HttpTransport
{
if (hasContent)
{
commit(info, false, commitCallback);
send(content, lastContent, callback);
commit(info, false, new Callback()
{
@Override
public void succeeded()
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{} committed", stream.getId());
send(content, lastContent, callback);
}
@Override
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #" + stream.getId() + " failed to commit", x);
callback.failed(x);
}
});
}
else
{
@ -145,7 +160,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport
if (LOG.isDebugEnabled())
LOG.debug("HTTP/2 Push {}",request);
stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Promise<Stream>()
{
@Override
@ -211,21 +226,4 @@ public class HttpTransportOverHTTP2 implements HttpTransport
if (stream != null)
stream.reset(new ResetFrame(stream.getId(), ErrorCode.INTERNAL_ERROR.code), Callback.NOOP);
}
private class CommitCallback implements Callback.NonBlocking
{
@Override
public void succeeded()
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #{} committed", stream.getId());
}
@Override
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Response #" + stream.getId() + " failed to commit", x);
}
}
}