From bd62320285a17a8ec7b13be548aa81e5c82e80de Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 29 Feb 2016 18:45:30 +0100 Subject: [PATCH] Issue #360 (Improve HTTP/2 stream interleaving) Fixed by making the interleave quantum be the frame size rather than the flow control window size. Reworked HTTP2Flusher.process() to be simpler and properly interleave frames. --- .../eclipse/jetty/http2/client/HTTP2Test.java | 50 ++++ .../jetty/http2/client/InterleavingTest.java | 231 ++++++++++++++++++ .../jetty/http2/client/StreamCloseTest.java | 31 ++- .../org/eclipse/jetty/http2/HTTP2Flusher.java | 192 +++++++-------- .../org/eclipse/jetty/http2/HTTP2Session.java | 90 +++---- .../jetty/http2/generator/DataGenerator.java | 38 ++- .../jetty/http2/generator/Generator.java | 4 +- .../http2/frames/DataGenerateParseTest.java | 18 +- 8 files changed, 451 insertions(+), 203 deletions(-) create mode 100644 jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/InterleavingTest.java diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java index 5ec01772d4b..a32d6686cec 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java @@ -88,6 +88,56 @@ public class HTTP2Test extends AbstractTest Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); } + @Test + public void testRequestNoContentResponseEmptyContent() throws Exception + { + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, false), new Callback() + { + @Override + public void succeeded() + { + stream.data(new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), NOOP); + } + }); + return null; + } + }); + + Session session = newClient(new Session.Listener.Adapter()); + + HttpFields fields = new HttpFields(); + MetaData.Request metaData = newRequest("GET", fields); + HeadersFrame frame = new HeadersFrame(metaData, null, true); + final CountDownLatch latch = new CountDownLatch(1); + session.newStream(frame, new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + Assert.assertFalse(frame.isEndStream()); + Assert.assertEquals(stream.getId(), frame.getStreamId()); + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + Assert.assertEquals(200, response.getStatus()); + } + + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + Assert.assertTrue(frame.isEndStream()); + callback.succeeded(); + latch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + @Test public void testRequestNoContentResponseContent() throws Exception { diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/InterleavingTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/InterleavingTest.java new file mode 100644 index 00000000000..d64b521f2b1 --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/InterleavingTest.java @@ -0,0 +1,231 @@ +// +// ======================================================================== +// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.ISession; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.Frame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.SettingsFrame; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Test; + +public class InterleavingTest extends AbstractTest +{ + @Test + public void testInterleaving() throws Exception + { + CountDownLatch serverStreamsLatch = new CountDownLatch(2); + List serverStreams = new ArrayList<>(); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverStreams.add(stream); + serverStreamsLatch.countDown(); + return null; + } + }); + + int maxFrameSize = Frame.DEFAULT_MAX_LENGTH + 1; + Session session = newClient(new Session.Listener.Adapter() + { + @Override + public Map onPreface(Session session) + { + Map settings = new HashMap<>(); + settings.put(SettingsFrame.MAX_FRAME_SIZE, maxFrameSize); + return settings; + } + }); + + BlockingQueue dataFrames = new LinkedBlockingDeque<>(); + Stream.Listener streamListener = new Stream.Listener.Adapter() + { + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + ByteBuffer data = frame.getData(); + byte[] bytes = new byte[data.remaining()]; + data.get(bytes); + dataFrames.offer(new FrameBytesCallback(frame, bytes, callback)); + } + }; + + HeadersFrame headersFrame1 = new HeadersFrame(newRequest("GET", new HttpFields()), null, true); + FuturePromise streamPromise1 = new FuturePromise<>(); + session.newStream(headersFrame1, streamPromise1, streamListener); + streamPromise1.get(5, TimeUnit.SECONDS); + + HeadersFrame headersFrame2 = new HeadersFrame(newRequest("GET", new HttpFields()), null, true); + FuturePromise streamPromise2 = new FuturePromise<>(); + session.newStream(headersFrame2, streamPromise2, streamListener); + streamPromise2.get(5, TimeUnit.SECONDS); + + Assert.assertTrue(serverStreamsLatch.await(5, TimeUnit.SECONDS)); + + Thread.sleep(1000); + + Stream serverStream1 = serverStreams.get(0); + Stream serverStream2 = serverStreams.get(1); + MetaData.Response response1 = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields(), 0); + serverStream1.headers(new HeadersFrame(serverStream1.getId(), response1, null, false), Callback.NOOP); + + Random random = new Random(); + byte[] content1 = new byte[2 * ((ISession)serverStream1.getSession()).updateSendWindow(0)]; + random.nextBytes(content1); + byte[] content2 = new byte[2 * ((ISession)serverStream2.getSession()).updateSendWindow(0)]; + random.nextBytes(content2); + + MetaData.Response response2 = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields(), 0); + serverStream2.headers(new HeadersFrame(serverStream2.getId(), response2, null, false), new Callback() + { + @Override + public void succeeded() + { + // Write data for both streams from within the callback so that they get queued together. + + ByteBuffer buffer1 = ByteBuffer.wrap(content1); + serverStream1.data(new DataFrame(serverStream1.getId(), buffer1, true), NOOP); + + ByteBuffer buffer2 = ByteBuffer.wrap(content2); + serverStream2.data(new DataFrame(serverStream2.getId(), buffer2, true), NOOP); + } + }); + + // The client reads with a buffer size that is different from the + // frame size and synthesizes DATA frames, so expect N frames for + // stream1 up to maxFrameSize of data, then M frames for stream2 + // up to maxFrameSize of data, and so forth, interleaved. + + Map contents = new HashMap<>(); + contents.put(serverStream1.getId(), new ByteArrayOutputStream()); + contents.put(serverStream2.getId(), new ByteArrayOutputStream()); + List streamLengths = new ArrayList<>(); + int finished = 0; + while (finished < 2) + { + FrameBytesCallback frameBytesCallback = dataFrames.poll(5, TimeUnit.SECONDS); + if (frameBytesCallback == null) + Assert.fail(); + + DataFrame dataFrame = frameBytesCallback.frame; + int streamId = dataFrame.getStreamId(); + int length = dataFrame.remaining(); + streamLengths.add(new StreamLength(streamId, length)); + if (dataFrame.isEndStream()) + ++finished; + + contents.get(streamId).write(frameBytesCallback.bytes); + + frameBytesCallback.callback.succeeded(); + } + + // Verify that the content has been sent properly. + Assert.assertArrayEquals(content1, contents.get(serverStream1.getId()).toByteArray()); + Assert.assertArrayEquals(content2, contents.get(serverStream2.getId()).toByteArray()); + + // Verify that the interleaving is correct. + Map> groups = new HashMap<>(); + groups.put(serverStream1.getId(), new ArrayList<>()); + groups.put(serverStream2.getId(), new ArrayList<>()); + int currentStream = 0; + int currentLength = 0; + for (StreamLength streamLength : streamLengths) + { + if (currentStream == 0) + currentStream = streamLength.stream; + if (currentStream != streamLength.stream) + { + groups.get(currentStream).add(currentLength); + currentStream = streamLength.stream; + currentLength = 0; + } + currentLength += streamLength.length; + } + groups.get(currentStream).add(currentLength); + + Logger logger = Log.getLogger(getClass()); + logger.debug("frame lengths = {}", streamLengths); + + groups.forEach((stream, lengths) -> + { + logger.debug("stream {} interleaved lengths = {}", stream, lengths); + for (Integer length : lengths) + Assert.assertThat(length, Matchers.lessThanOrEqualTo(maxFrameSize)); + }); + } + + private static class FrameBytesCallback + { + private final DataFrame frame; + private final byte[] bytes; + private final Callback callback; + + private FrameBytesCallback(DataFrame frame, byte[] bytes, Callback callback) + { + this.frame = frame; + this.bytes = bytes; + this.callback = callback; + } + } + + private static class StreamLength + { + private final int stream; + private final int length; + + private StreamLength(int stream, int length) + { + this.stream = stream; + this.length = length; + } + + @Override + public String toString() + { + return String.format("(%d,%d)", stream, length); + } + } +} diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java index 9f0590f0dbb..ebe47c2fc23 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java @@ -131,24 +131,23 @@ public class StreamCloseTest extends AbstractTest { Assert.assertTrue(((HTTP2Stream)stream).isRemotelyClosed()); + // We must copy the data that we send asynchronously. + ByteBuffer data = frame.getData(); + ByteBuffer copy = ByteBuffer.allocate(data.remaining()); + copy.put(data).flip(); + completable.thenRun(() -> - { - // We must copy the data that we send asynchronously. - ByteBuffer data = frame.getData(); - ByteBuffer copy = ByteBuffer.allocate(data.remaining()); - copy.put(data).flip(); - stream.data(new DataFrame(stream.getId(), copy, frame.isEndStream()), new Callback() - { - @Override - public void succeeded() + stream.data(new DataFrame(stream.getId(), copy, frame.isEndStream()), new Callback() { - Assert.assertTrue(stream.isClosed()); - Assert.assertEquals(0, stream.getSession().getStreams().size()); - callback.succeeded(); - serverDataLatch.countDown(); - } - }); - }); + @Override + public void succeeded() + { + Assert.assertTrue(stream.isClosed()); + Assert.assertEquals(0, stream.getSession().getStreams().size()); + callback.succeeded(); + serverDataLatch.countDown(); + } + })); } }; } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java index 9ddd316d8b4..55d06b49871 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java @@ -22,9 +22,7 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Queue; import org.eclipse.jetty.http2.frames.Frame; @@ -43,11 +41,11 @@ public class HTTP2Flusher extends IteratingCallback private final Queue windows = new ArrayDeque<>(); private final ArrayQueue frames = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH, this); - private final Map streams = new HashMap<>(); - private final List resets = new ArrayList<>(); + private final Queue entries = new ArrayDeque<>(); private final List actives = new ArrayList<>(); private final HTTP2Session session; private final ByteBufferPool.Lease lease; + private Entry stalled; private boolean terminated; public HTTP2Flusher(HTTP2Session session) @@ -106,17 +104,6 @@ public class HTTP2Flusher extends IteratingCallback return !closed; } - private Entry remove(int index) - { - synchronized (this) - { - if (index == 0) - return frames.pollUnsafe(); - else - return frames.remove(index); - } - } - public int getQueueSize() { synchronized (this) @@ -136,112 +123,75 @@ public class HTTP2Flusher extends IteratingCallback if (terminated) throw new ClosedChannelException(); - // First thing, update the window sizes, so we can - // reason about the frames to remove from the queue. while (!windows.isEmpty()) { WindowEntry entry = windows.poll(); entry.perform(); } - // Now the window sizes cannot change. - // Window updates that happen concurrently will - // be queued and processed on the next iteration. - int sessionWindow = session.getSendWindow(); - - int index = 0; - int size = frames.size(); - while (index < size) + if (!frames.isEmpty()) { - Entry entry = frames.get(index); - IStream stream = entry.stream; - - // If the stream has been reset, don't send the frame. - if (stream != null && stream.isReset() && !entry.isProtocol()) + for (Entry entry : frames) { - remove(index); - --size; - resets.add(entry); - if (LOG.isDebugEnabled()) - LOG.debug("Gathered for reset {}", entry); - continue; + entries.offer(entry); + actives.add(entry); } - - // Check if the frame fits in the flow control windows. - int remaining = entry.dataRemaining(); - if (remaining > 0) - { - if (sessionWindow <= 0) - { - ++index; - // There may be *non* flow controlled frames to send. - continue; - } - - if (stream != null) - { - // The stream may have a smaller window than the session. - Integer streamWindow = streams.get(stream); - if (streamWindow == null) - { - streamWindow = stream.updateSendWindow(0); - streams.put(stream, streamWindow); - } - - // Is it a frame belonging to an already stalled stream ? - if (streamWindow <= 0) - { - ++index; - // There may be *non* flow controlled frames to send. - continue; - } - } - - // The frame fits both flow control windows, reduce them. - sessionWindow -= remaining; - if (stream != null) - streams.put(stream, streams.get(stream) - remaining); - } - - // The frame will be written, remove it from the queue. - remove(index); - --size; - actives.add(entry); - - if (LOG.isDebugEnabled()) - LOG.debug("Gathered for write {}", entry); + frames.clear(); } - streams.clear(); } - // Perform resets outside the sync block. - for (int i = 0; i < resets.size(); ++i) - { - Entry entry = resets.get(i); - entry.reset(); - } - resets.clear(); - if (actives.isEmpty()) + if (entries.isEmpty()) { if (LOG.isDebugEnabled()) LOG.debug("Flushed {}", session); return Action.IDLE; } - for (int i = 0; i < actives.size(); ++i) + while (!entries.isEmpty()) { - Entry entry = actives.get(i); - Throwable failure = entry.generate(lease); - if (failure != null) + Entry entry = entries.poll(); + if (LOG.isDebugEnabled()) + LOG.debug("Processing {}", entry); + + // If the stream has been reset, don't send the frame. + if (entry.reset()) + { + if (LOG.isDebugEnabled()) + LOG.debug("Resetting {}", entry); + continue; + } + + try + { + if (entry.generate(lease)) + { + if (entry.dataRemaining() > 0) + entries.offer(entry); + } + else + { + if (stalled == null) + stalled = entry; + } + } + catch (Throwable failure) { // Failure to generate the entry is catastrophic. + if (LOG.isDebugEnabled()) + LOG.debug("Failure generating frame " + entry.frame, failure); failed(failure); return Action.SUCCEEDED; } } List byteBuffers = lease.getByteBuffers(); + if (byteBuffers.isEmpty()) + { + complete(); + return Action.IDLE; + } + if (LOG.isDebugEnabled()) LOG.debug("Writing {} buffers ({} bytes) for {} frames {}", byteBuffers.size(), lease.getTotalLength(), actives.size(), actives); session.getEndPoint().write(this, byteBuffers.toArray(new ByteBuffer[byteBuffers.size()])); @@ -251,17 +201,45 @@ public class HTTP2Flusher extends IteratingCallback @Override public void succeeded() { - lease.recycle(); - if (LOG.isDebugEnabled()) LOG.debug("Written {} frames for {}", actives.size(), actives); - actives.forEach(Entry::succeeded); - actives.clear(); + complete(); super.succeeded(); } + private void complete() + { + lease.recycle(); + + actives.forEach(Entry::complete); + + if (stalled != null) + { + // We have written part of the frame, but there is more to write. + // The API will not allow to send two data frames for the same + // stream so we append the unfinished frame at the end to allow + // better interleaving with other streams. + int index = actives.indexOf(stalled); + for (int i = index; i < actives.size(); ++i) + { + Entry entry = actives.get(i); + if (entry.dataRemaining() > 0) + append(entry); + } + for (int i = 0; i < index; ++i) + { + Entry entry = actives.get(i); + if (entry.dataRemaining() > 0) + append(entry); + } + stalled = null; + } + + actives.clear(); + } + @Override protected void onCompleteSuccess() { @@ -317,6 +295,7 @@ public class HTTP2Flusher extends IteratingCallback protected final Frame frame; protected final IStream stream; protected final Callback callback; + private boolean reset; protected Entry(Frame frame, IStream stream, Callback callback) { @@ -330,14 +309,14 @@ public class HTTP2Flusher extends IteratingCallback return 0; } - public Throwable generate(ByteBufferPool.Lease lease) - { - return null; - } + protected abstract boolean generate(ByteBufferPool.Lease lease); - public void reset() + private void complete() { - failed(new EofException("reset")); + if (reset) + failed(new EofException("reset")); + else + succeeded(); } @Override @@ -351,7 +330,12 @@ public class HTTP2Flusher extends IteratingCallback callback.failed(x); } - public boolean isProtocol() + private boolean reset() + { + return this.reset = stream != null && stream.isReset() && !isProtocol(); + } + + private boolean isProtocol() { switch (frame.getType()) { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 72b74d6aba8..1b45f6f49ae 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -1048,22 +1048,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio super(frame, stream, callback); } - public Throwable generate(ByteBufferPool.Lease lease) + protected boolean generate(ByteBufferPool.Lease lease) { - try - { - generator.control(lease, frame); - if (LOG.isDebugEnabled()) - LOG.debug("Generated {}", frame); - prepare(); - return null; - } - catch (Throwable x) - { - if (LOG.isDebugEnabled()) - LOG.debug("Failure generating frame " + frame, x); - return x; - } + generator.control(lease, frame); + if (LOG.isDebugEnabled()) + LOG.debug("Generated {}", frame); + prepare(); + return true; } /** @@ -1154,71 +1145,58 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private class DataEntry extends HTTP2Flusher.Entry { - private int length; + private int remaining; + private int generated; private DataEntry(DataFrame frame, IStream stream, Callback callback) { super(frame, stream, callback); - } - - @Override - public int dataRemaining() - { // We don't do any padding, so the flow control length is // always the data remaining. This simplifies the handling // of data frames that cannot be completely written due to // the flow control window exhausting, since in that case // we would have to count the padding only once. - return ((DataFrame)frame).remaining(); + remaining = frame.remaining(); } - public Throwable generate(ByteBufferPool.Lease lease) + @Override + public int dataRemaining() { - try - { - int flowControlLength = dataRemaining(); + return remaining; + } - int sessionSendWindow = getSendWindow(); - if (sessionSendWindow < 0) - throw new IllegalStateException(); + protected boolean generate(ByteBufferPool.Lease lease) + { + int toWrite = dataRemaining(); - int streamSendWindow = stream.updateSendWindow(0); - if (streamSendWindow < 0) - throw new IllegalStateException(); + int sessionSendWindow = getSendWindow(); + int streamSendWindow = stream.updateSendWindow(0); + int window = Math.min(streamSendWindow, sessionSendWindow); + if (window <= 0 && toWrite > 0) + return false; - int window = Math.min(streamSendWindow, sessionSendWindow); + int length = Math.min(toWrite, window); - int length = this.length = Math.min(flowControlLength, window); - if (LOG.isDebugEnabled()) - LOG.debug("Generated {}, length/window={}/{}", frame, length, window); + int generated = generator.data(lease, (DataFrame)frame, length); + if (LOG.isDebugEnabled()) + LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, generated, window, toWrite); - generator.data(lease, (DataFrame)frame, length); - flowControl.onDataSending(stream, length); - return null; - } - catch (Throwable x) - { - if (LOG.isDebugEnabled()) - LOG.debug("Failure generating frame " + frame, x); - return x; - } + this.generated += generated; + this.remaining -= generated; + + flowControl.onDataSending(stream, generated); + + return true; } @Override public void succeeded() { - flowControl.onDataSent(stream, length); + flowControl.onDataSent(stream, generated); + generated = 0; // Do we have more to send ? DataFrame dataFrame = (DataFrame)frame; - if (dataFrame.remaining() > 0) - { - // We have written part of the frame, but there is more to write. - // The API will not allow to send two data frames for the same - // stream so we append the unfinished frame at the end to allow - // better interleaving with other streams. - flusher.append(this); - } - else + if (dataRemaining() == 0) { // Only now we can update the close state // and eventually remove the stream. diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/DataGenerator.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/DataGenerator.java index b859ec6e2e5..66bfcb75151 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/DataGenerator.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/DataGenerator.java @@ -36,43 +36,34 @@ public class DataGenerator this.headerGenerator = headerGenerator; } - public void generate(ByteBufferPool.Lease lease, DataFrame frame, int maxLength) + public int generate(ByteBufferPool.Lease lease, DataFrame frame, int maxLength) { - generateData(lease, frame.getStreamId(), frame.getData(), frame.isEndStream(), maxLength); + return generateData(lease, frame.getStreamId(), frame.getData(), frame.isEndStream(), maxLength); } - public void generateData(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last, int maxLength) + public int generateData(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last, int maxLength) { if (streamId < 0) throw new IllegalArgumentException("Invalid stream id: " + streamId); int dataLength = data.remaining(); int maxFrameSize = headerGenerator.getMaxFrameSize(); - if (dataLength <= maxLength && dataLength <= maxFrameSize) + int length = Math.min(dataLength, Math.min(maxFrameSize, maxLength)); + if (length == dataLength) { - // Single frame. generateFrame(lease, streamId, data, last); - return; } - - // Other cases, we need to slice the original buffer into multiple frames. - - int length = Math.min(maxLength, dataLength); - int frames = length / maxFrameSize; - if (frames * maxFrameSize != length) - ++frames; - - int begin = data.position(); - int end = data.limit(); - for (int i = 1; i <= frames; ++i) + else { - int limit = begin + Math.min(maxFrameSize * i, length); - data.limit(limit); + int limit = data.limit(); + int newLimit = data.position() + length; + data.limit(newLimit); ByteBuffer slice = data.slice(); - data.position(limit); - generateFrame(lease, streamId, slice, i == frames && last && limit == end); + data.position(newLimit); + data.limit(limit); + generateFrame(lease, streamId, slice, false); } - data.limit(end); + return length; } private void generateFrame(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last) @@ -88,6 +79,7 @@ public class DataGenerator BufferUtil.flipToFlush(header, 0); lease.append(header, true); - lease.append(data, false); + if (data.remaining() > 0) + lease.append(data, false); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/Generator.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/Generator.java index 961e2de9b8b..a86c26ea4eb 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/Generator.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/generator/Generator.java @@ -80,8 +80,8 @@ public class Generator generators[frame.getType().getType()].generate(lease, frame); } - public void data(ByteBufferPool.Lease lease, DataFrame frame, int maxLength) + public int data(ByteBufferPool.Lease lease, DataFrame frame, int maxLength) { - dataGenerator.generate(lease, frame, maxLength); + return dataGenerator.generate(lease, frame, maxLength); } } diff --git a/jetty-http2/http2-common/src/test/java/org/eclipse/jetty/http2/frames/DataGenerateParseTest.java b/jetty-http2/http2-common/src/test/java/org/eclipse/jetty/http2/frames/DataGenerateParseTest.java index c8a12ef3474..d730589a70a 100644 --- a/jetty-http2/http2-common/src/test/java/org/eclipse/jetty/http2/frames/DataGenerateParseTest.java +++ b/jetty-http2/http2-common/src/test/java/org/eclipse/jetty/http2/frames/DataGenerateParseTest.java @@ -103,7 +103,14 @@ public class DataGenerateParseTest for (int i = 0; i < 2; ++i) { ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool); - generator.generateData(lease, 13, data.slice(), true, data.remaining()); + ByteBuffer slice = data.slice(); + int generated = 0; + while (true) + { + generated += generator.generateData(lease, 13, slice, true, slice.remaining()); + if (generated == data.remaining()) + break; + } frames.clear(); for (ByteBuffer buffer : lease.getByteBuffers()) @@ -135,7 +142,14 @@ public class DataGenerateParseTest { ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool); ByteBuffer data = ByteBuffer.wrap(largeContent); - generator.generateData(lease, 13, data.slice(), true, data.remaining()); + ByteBuffer slice = data.slice(); + int generated = 0; + while (true) + { + generated += generator.generateData(lease, 13, slice, true, slice.remaining()); + if (generated == data.remaining()) + break; + } frames.clear(); for (ByteBuffer buffer : lease.getByteBuffers())