Issue #360 (Improve HTTP/2 stream interleaving)

Fixed by making the interleave quantum be the frame size rather than
the flow control window size.

Reworked HTTP2Flusher.process() to be simpler and properly
interleave frames.
This commit is contained in:
Simone Bordet 2016-02-29 18:45:30 +01:00
parent 2c11372bc4
commit bd62320285
8 changed files with 451 additions and 203 deletions

View File

@ -88,6 +88,56 @@ public class HTTP2Test extends AbstractTest
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
} }
@Test
public void testRequestNoContentResponseEmptyContent() throws Exception
{
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, false), new Callback()
{
@Override
public void succeeded()
{
stream.data(new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), NOOP);
}
});
return null;
}
});
Session session = newClient(new Session.Listener.Adapter());
HttpFields fields = new HttpFields();
MetaData.Request metaData = newRequest("GET", fields);
HeadersFrame frame = new HeadersFrame(metaData, null, true);
final CountDownLatch latch = new CountDownLatch(1);
session.newStream(frame, new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
Assert.assertFalse(frame.isEndStream());
Assert.assertEquals(stream.getId(), frame.getStreamId());
MetaData.Response response = (MetaData.Response)frame.getMetaData();
Assert.assertEquals(200, response.getStatus());
}
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
Assert.assertTrue(frame.isEndStream());
callback.succeeded();
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test @Test
public void testRequestNoContentResponseContent() throws Exception public void testRequestNoContentResponseContent() throws Exception
{ {

View File

@ -0,0 +1,231 @@
//
// ========================================================================
// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
public class InterleavingTest extends AbstractTest
{
@Test
public void testInterleaving() throws Exception
{
CountDownLatch serverStreamsLatch = new CountDownLatch(2);
List<Stream> serverStreams = new ArrayList<>();
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
serverStreams.add(stream);
serverStreamsLatch.countDown();
return null;
}
});
int maxFrameSize = Frame.DEFAULT_MAX_LENGTH + 1;
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public Map<Integer, Integer> onPreface(Session session)
{
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.MAX_FRAME_SIZE, maxFrameSize);
return settings;
}
});
BlockingQueue<FrameBytesCallback> dataFrames = new LinkedBlockingDeque<>();
Stream.Listener streamListener = new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
ByteBuffer data = frame.getData();
byte[] bytes = new byte[data.remaining()];
data.get(bytes);
dataFrames.offer(new FrameBytesCallback(frame, bytes, callback));
}
};
HeadersFrame headersFrame1 = new HeadersFrame(newRequest("GET", new HttpFields()), null, true);
FuturePromise<Stream> streamPromise1 = new FuturePromise<>();
session.newStream(headersFrame1, streamPromise1, streamListener);
streamPromise1.get(5, TimeUnit.SECONDS);
HeadersFrame headersFrame2 = new HeadersFrame(newRequest("GET", new HttpFields()), null, true);
FuturePromise<Stream> streamPromise2 = new FuturePromise<>();
session.newStream(headersFrame2, streamPromise2, streamListener);
streamPromise2.get(5, TimeUnit.SECONDS);
Assert.assertTrue(serverStreamsLatch.await(5, TimeUnit.SECONDS));
Thread.sleep(1000);
Stream serverStream1 = serverStreams.get(0);
Stream serverStream2 = serverStreams.get(1);
MetaData.Response response1 = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields(), 0);
serverStream1.headers(new HeadersFrame(serverStream1.getId(), response1, null, false), Callback.NOOP);
Random random = new Random();
byte[] content1 = new byte[2 * ((ISession)serverStream1.getSession()).updateSendWindow(0)];
random.nextBytes(content1);
byte[] content2 = new byte[2 * ((ISession)serverStream2.getSession()).updateSendWindow(0)];
random.nextBytes(content2);
MetaData.Response response2 = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields(), 0);
serverStream2.headers(new HeadersFrame(serverStream2.getId(), response2, null, false), new Callback()
{
@Override
public void succeeded()
{
// Write data for both streams from within the callback so that they get queued together.
ByteBuffer buffer1 = ByteBuffer.wrap(content1);
serverStream1.data(new DataFrame(serverStream1.getId(), buffer1, true), NOOP);
ByteBuffer buffer2 = ByteBuffer.wrap(content2);
serverStream2.data(new DataFrame(serverStream2.getId(), buffer2, true), NOOP);
}
});
// The client reads with a buffer size that is different from the
// frame size and synthesizes DATA frames, so expect N frames for
// stream1 up to maxFrameSize of data, then M frames for stream2
// up to maxFrameSize of data, and so forth, interleaved.
Map<Integer, ByteArrayOutputStream> contents = new HashMap<>();
contents.put(serverStream1.getId(), new ByteArrayOutputStream());
contents.put(serverStream2.getId(), new ByteArrayOutputStream());
List<StreamLength> streamLengths = new ArrayList<>();
int finished = 0;
while (finished < 2)
{
FrameBytesCallback frameBytesCallback = dataFrames.poll(5, TimeUnit.SECONDS);
if (frameBytesCallback == null)
Assert.fail();
DataFrame dataFrame = frameBytesCallback.frame;
int streamId = dataFrame.getStreamId();
int length = dataFrame.remaining();
streamLengths.add(new StreamLength(streamId, length));
if (dataFrame.isEndStream())
++finished;
contents.get(streamId).write(frameBytesCallback.bytes);
frameBytesCallback.callback.succeeded();
}
// Verify that the content has been sent properly.
Assert.assertArrayEquals(content1, contents.get(serverStream1.getId()).toByteArray());
Assert.assertArrayEquals(content2, contents.get(serverStream2.getId()).toByteArray());
// Verify that the interleaving is correct.
Map<Integer, List<Integer>> groups = new HashMap<>();
groups.put(serverStream1.getId(), new ArrayList<>());
groups.put(serverStream2.getId(), new ArrayList<>());
int currentStream = 0;
int currentLength = 0;
for (StreamLength streamLength : streamLengths)
{
if (currentStream == 0)
currentStream = streamLength.stream;
if (currentStream != streamLength.stream)
{
groups.get(currentStream).add(currentLength);
currentStream = streamLength.stream;
currentLength = 0;
}
currentLength += streamLength.length;
}
groups.get(currentStream).add(currentLength);
Logger logger = Log.getLogger(getClass());
logger.debug("frame lengths = {}", streamLengths);
groups.forEach((stream, lengths) ->
{
logger.debug("stream {} interleaved lengths = {}", stream, lengths);
for (Integer length : lengths)
Assert.assertThat(length, Matchers.lessThanOrEqualTo(maxFrameSize));
});
}
private static class FrameBytesCallback
{
private final DataFrame frame;
private final byte[] bytes;
private final Callback callback;
private FrameBytesCallback(DataFrame frame, byte[] bytes, Callback callback)
{
this.frame = frame;
this.bytes = bytes;
this.callback = callback;
}
}
private static class StreamLength
{
private final int stream;
private final int length;
private StreamLength(int stream, int length)
{
this.stream = stream;
this.length = length;
}
@Override
public String toString()
{
return String.format("(%d,%d)", stream, length);
}
}
}

View File

@ -131,24 +131,23 @@ public class StreamCloseTest extends AbstractTest
{ {
Assert.assertTrue(((HTTP2Stream)stream).isRemotelyClosed()); Assert.assertTrue(((HTTP2Stream)stream).isRemotelyClosed());
// We must copy the data that we send asynchronously.
ByteBuffer data = frame.getData();
ByteBuffer copy = ByteBuffer.allocate(data.remaining());
copy.put(data).flip();
completable.thenRun(() -> completable.thenRun(() ->
{ stream.data(new DataFrame(stream.getId(), copy, frame.isEndStream()), new Callback()
// We must copy the data that we send asynchronously.
ByteBuffer data = frame.getData();
ByteBuffer copy = ByteBuffer.allocate(data.remaining());
copy.put(data).flip();
stream.data(new DataFrame(stream.getId(), copy, frame.isEndStream()), new Callback()
{
@Override
public void succeeded()
{ {
Assert.assertTrue(stream.isClosed()); @Override
Assert.assertEquals(0, stream.getSession().getStreams().size()); public void succeeded()
callback.succeeded(); {
serverDataLatch.countDown(); Assert.assertTrue(stream.isClosed());
} Assert.assertEquals(0, stream.getSession().getStreams().size());
}); callback.succeeded();
}); serverDataLatch.countDown();
}
}));
} }
}; };
} }

View File

@ -22,9 +22,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Queue; import java.util.Queue;
import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.Frame;
@ -43,11 +41,11 @@ public class HTTP2Flusher extends IteratingCallback
private final Queue<WindowEntry> windows = new ArrayDeque<>(); private final Queue<WindowEntry> windows = new ArrayDeque<>();
private final ArrayQueue<Entry> frames = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH, this); private final ArrayQueue<Entry> frames = new ArrayQueue<>(ArrayQueue.DEFAULT_CAPACITY, ArrayQueue.DEFAULT_GROWTH, this);
private final Map<IStream, Integer> streams = new HashMap<>(); private final Queue<Entry> entries = new ArrayDeque<>();
private final List<Entry> resets = new ArrayList<>();
private final List<Entry> actives = new ArrayList<>(); private final List<Entry> actives = new ArrayList<>();
private final HTTP2Session session; private final HTTP2Session session;
private final ByteBufferPool.Lease lease; private final ByteBufferPool.Lease lease;
private Entry stalled;
private boolean terminated; private boolean terminated;
public HTTP2Flusher(HTTP2Session session) public HTTP2Flusher(HTTP2Session session)
@ -106,17 +104,6 @@ public class HTTP2Flusher extends IteratingCallback
return !closed; return !closed;
} }
private Entry remove(int index)
{
synchronized (this)
{
if (index == 0)
return frames.pollUnsafe();
else
return frames.remove(index);
}
}
public int getQueueSize() public int getQueueSize()
{ {
synchronized (this) synchronized (this)
@ -136,112 +123,75 @@ public class HTTP2Flusher extends IteratingCallback
if (terminated) if (terminated)
throw new ClosedChannelException(); throw new ClosedChannelException();
// First thing, update the window sizes, so we can
// reason about the frames to remove from the queue.
while (!windows.isEmpty()) while (!windows.isEmpty())
{ {
WindowEntry entry = windows.poll(); WindowEntry entry = windows.poll();
entry.perform(); entry.perform();
} }
// Now the window sizes cannot change. if (!frames.isEmpty())
// Window updates that happen concurrently will
// be queued and processed on the next iteration.
int sessionWindow = session.getSendWindow();
int index = 0;
int size = frames.size();
while (index < size)
{ {
Entry entry = frames.get(index); for (Entry entry : frames)
IStream stream = entry.stream;
// If the stream has been reset, don't send the frame.
if (stream != null && stream.isReset() && !entry.isProtocol())
{ {
remove(index); entries.offer(entry);
--size; actives.add(entry);
resets.add(entry);
if (LOG.isDebugEnabled())
LOG.debug("Gathered for reset {}", entry);
continue;
} }
frames.clear();
// Check if the frame fits in the flow control windows.
int remaining = entry.dataRemaining();
if (remaining > 0)
{
if (sessionWindow <= 0)
{
++index;
// There may be *non* flow controlled frames to send.
continue;
}
if (stream != null)
{
// The stream may have a smaller window than the session.
Integer streamWindow = streams.get(stream);
if (streamWindow == null)
{
streamWindow = stream.updateSendWindow(0);
streams.put(stream, streamWindow);
}
// Is it a frame belonging to an already stalled stream ?
if (streamWindow <= 0)
{
++index;
// There may be *non* flow controlled frames to send.
continue;
}
}
// The frame fits both flow control windows, reduce them.
sessionWindow -= remaining;
if (stream != null)
streams.put(stream, streams.get(stream) - remaining);
}
// The frame will be written, remove it from the queue.
remove(index);
--size;
actives.add(entry);
if (LOG.isDebugEnabled())
LOG.debug("Gathered for write {}", entry);
} }
streams.clear();
} }
// Perform resets outside the sync block.
for (int i = 0; i < resets.size(); ++i)
{
Entry entry = resets.get(i);
entry.reset();
}
resets.clear();
if (actives.isEmpty()) if (entries.isEmpty())
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Flushed {}", session); LOG.debug("Flushed {}", session);
return Action.IDLE; return Action.IDLE;
} }
for (int i = 0; i < actives.size(); ++i) while (!entries.isEmpty())
{ {
Entry entry = actives.get(i); Entry entry = entries.poll();
Throwable failure = entry.generate(lease); if (LOG.isDebugEnabled())
if (failure != null) LOG.debug("Processing {}", entry);
// If the stream has been reset, don't send the frame.
if (entry.reset())
{
if (LOG.isDebugEnabled())
LOG.debug("Resetting {}", entry);
continue;
}
try
{
if (entry.generate(lease))
{
if (entry.dataRemaining() > 0)
entries.offer(entry);
}
else
{
if (stalled == null)
stalled = entry;
}
}
catch (Throwable failure)
{ {
// Failure to generate the entry is catastrophic. // Failure to generate the entry is catastrophic.
if (LOG.isDebugEnabled())
LOG.debug("Failure generating frame " + entry.frame, failure);
failed(failure); failed(failure);
return Action.SUCCEEDED; return Action.SUCCEEDED;
} }
} }
List<ByteBuffer> byteBuffers = lease.getByteBuffers(); List<ByteBuffer> byteBuffers = lease.getByteBuffers();
if (byteBuffers.isEmpty())
{
complete();
return Action.IDLE;
}
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Writing {} buffers ({} bytes) for {} frames {}", byteBuffers.size(), lease.getTotalLength(), actives.size(), actives); LOG.debug("Writing {} buffers ({} bytes) for {} frames {}", byteBuffers.size(), lease.getTotalLength(), actives.size(), actives);
session.getEndPoint().write(this, byteBuffers.toArray(new ByteBuffer[byteBuffers.size()])); session.getEndPoint().write(this, byteBuffers.toArray(new ByteBuffer[byteBuffers.size()]));
@ -251,17 +201,45 @@ public class HTTP2Flusher extends IteratingCallback
@Override @Override
public void succeeded() public void succeeded()
{ {
lease.recycle();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Written {} frames for {}", actives.size(), actives); LOG.debug("Written {} frames for {}", actives.size(), actives);
actives.forEach(Entry::succeeded); complete();
actives.clear();
super.succeeded(); super.succeeded();
} }
private void complete()
{
lease.recycle();
actives.forEach(Entry::complete);
if (stalled != null)
{
// We have written part of the frame, but there is more to write.
// The API will not allow to send two data frames for the same
// stream so we append the unfinished frame at the end to allow
// better interleaving with other streams.
int index = actives.indexOf(stalled);
for (int i = index; i < actives.size(); ++i)
{
Entry entry = actives.get(i);
if (entry.dataRemaining() > 0)
append(entry);
}
for (int i = 0; i < index; ++i)
{
Entry entry = actives.get(i);
if (entry.dataRemaining() > 0)
append(entry);
}
stalled = null;
}
actives.clear();
}
@Override @Override
protected void onCompleteSuccess() protected void onCompleteSuccess()
{ {
@ -317,6 +295,7 @@ public class HTTP2Flusher extends IteratingCallback
protected final Frame frame; protected final Frame frame;
protected final IStream stream; protected final IStream stream;
protected final Callback callback; protected final Callback callback;
private boolean reset;
protected Entry(Frame frame, IStream stream, Callback callback) protected Entry(Frame frame, IStream stream, Callback callback)
{ {
@ -330,14 +309,14 @@ public class HTTP2Flusher extends IteratingCallback
return 0; return 0;
} }
public Throwable generate(ByteBufferPool.Lease lease) protected abstract boolean generate(ByteBufferPool.Lease lease);
{
return null;
}
public void reset() private void complete()
{ {
failed(new EofException("reset")); if (reset)
failed(new EofException("reset"));
else
succeeded();
} }
@Override @Override
@ -351,7 +330,12 @@ public class HTTP2Flusher extends IteratingCallback
callback.failed(x); callback.failed(x);
} }
public boolean isProtocol() private boolean reset()
{
return this.reset = stream != null && stream.isReset() && !isProtocol();
}
private boolean isProtocol()
{ {
switch (frame.getType()) switch (frame.getType())
{ {

View File

@ -1048,22 +1048,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
super(frame, stream, callback); super(frame, stream, callback);
} }
public Throwable generate(ByteBufferPool.Lease lease) protected boolean generate(ByteBufferPool.Lease lease)
{ {
try generator.control(lease, frame);
{ if (LOG.isDebugEnabled())
generator.control(lease, frame); LOG.debug("Generated {}", frame);
if (LOG.isDebugEnabled()) prepare();
LOG.debug("Generated {}", frame); return true;
prepare();
return null;
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Failure generating frame " + frame, x);
return x;
}
} }
/** /**
@ -1154,71 +1145,58 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private class DataEntry extends HTTP2Flusher.Entry private class DataEntry extends HTTP2Flusher.Entry
{ {
private int length; private int remaining;
private int generated;
private DataEntry(DataFrame frame, IStream stream, Callback callback) private DataEntry(DataFrame frame, IStream stream, Callback callback)
{ {
super(frame, stream, callback); super(frame, stream, callback);
}
@Override
public int dataRemaining()
{
// We don't do any padding, so the flow control length is // We don't do any padding, so the flow control length is
// always the data remaining. This simplifies the handling // always the data remaining. This simplifies the handling
// of data frames that cannot be completely written due to // of data frames that cannot be completely written due to
// the flow control window exhausting, since in that case // the flow control window exhausting, since in that case
// we would have to count the padding only once. // we would have to count the padding only once.
return ((DataFrame)frame).remaining(); remaining = frame.remaining();
} }
public Throwable generate(ByteBufferPool.Lease lease) @Override
public int dataRemaining()
{ {
try return remaining;
{ }
int flowControlLength = dataRemaining();
int sessionSendWindow = getSendWindow(); protected boolean generate(ByteBufferPool.Lease lease)
if (sessionSendWindow < 0) {
throw new IllegalStateException(); int toWrite = dataRemaining();
int streamSendWindow = stream.updateSendWindow(0); int sessionSendWindow = getSendWindow();
if (streamSendWindow < 0) int streamSendWindow = stream.updateSendWindow(0);
throw new IllegalStateException(); int window = Math.min(streamSendWindow, sessionSendWindow);
if (window <= 0 && toWrite > 0)
return false;
int window = Math.min(streamSendWindow, sessionSendWindow); int length = Math.min(toWrite, window);
int length = this.length = Math.min(flowControlLength, window); int generated = generator.data(lease, (DataFrame)frame, length);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Generated {}, length/window={}/{}", frame, length, window); LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, generated, window, toWrite);
generator.data(lease, (DataFrame)frame, length); this.generated += generated;
flowControl.onDataSending(stream, length); this.remaining -= generated;
return null;
} flowControl.onDataSending(stream, generated);
catch (Throwable x)
{ return true;
if (LOG.isDebugEnabled())
LOG.debug("Failure generating frame " + frame, x);
return x;
}
} }
@Override @Override
public void succeeded() public void succeeded()
{ {
flowControl.onDataSent(stream, length); flowControl.onDataSent(stream, generated);
generated = 0;
// Do we have more to send ? // Do we have more to send ?
DataFrame dataFrame = (DataFrame)frame; DataFrame dataFrame = (DataFrame)frame;
if (dataFrame.remaining() > 0) if (dataRemaining() == 0)
{
// We have written part of the frame, but there is more to write.
// The API will not allow to send two data frames for the same
// stream so we append the unfinished frame at the end to allow
// better interleaving with other streams.
flusher.append(this);
}
else
{ {
// Only now we can update the close state // Only now we can update the close state
// and eventually remove the stream. // and eventually remove the stream.

View File

@ -36,43 +36,34 @@ public class DataGenerator
this.headerGenerator = headerGenerator; this.headerGenerator = headerGenerator;
} }
public void generate(ByteBufferPool.Lease lease, DataFrame frame, int maxLength) public int generate(ByteBufferPool.Lease lease, DataFrame frame, int maxLength)
{ {
generateData(lease, frame.getStreamId(), frame.getData(), frame.isEndStream(), maxLength); return generateData(lease, frame.getStreamId(), frame.getData(), frame.isEndStream(), maxLength);
} }
public void generateData(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last, int maxLength) public int generateData(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last, int maxLength)
{ {
if (streamId < 0) if (streamId < 0)
throw new IllegalArgumentException("Invalid stream id: " + streamId); throw new IllegalArgumentException("Invalid stream id: " + streamId);
int dataLength = data.remaining(); int dataLength = data.remaining();
int maxFrameSize = headerGenerator.getMaxFrameSize(); int maxFrameSize = headerGenerator.getMaxFrameSize();
if (dataLength <= maxLength && dataLength <= maxFrameSize) int length = Math.min(dataLength, Math.min(maxFrameSize, maxLength));
if (length == dataLength)
{ {
// Single frame.
generateFrame(lease, streamId, data, last); generateFrame(lease, streamId, data, last);
return;
} }
else
// Other cases, we need to slice the original buffer into multiple frames.
int length = Math.min(maxLength, dataLength);
int frames = length / maxFrameSize;
if (frames * maxFrameSize != length)
++frames;
int begin = data.position();
int end = data.limit();
for (int i = 1; i <= frames; ++i)
{ {
int limit = begin + Math.min(maxFrameSize * i, length); int limit = data.limit();
data.limit(limit); int newLimit = data.position() + length;
data.limit(newLimit);
ByteBuffer slice = data.slice(); ByteBuffer slice = data.slice();
data.position(limit); data.position(newLimit);
generateFrame(lease, streamId, slice, i == frames && last && limit == end); data.limit(limit);
generateFrame(lease, streamId, slice, false);
} }
data.limit(end); return length;
} }
private void generateFrame(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last) private void generateFrame(ByteBufferPool.Lease lease, int streamId, ByteBuffer data, boolean last)
@ -88,6 +79,7 @@ public class DataGenerator
BufferUtil.flipToFlush(header, 0); BufferUtil.flipToFlush(header, 0);
lease.append(header, true); lease.append(header, true);
lease.append(data, false); if (data.remaining() > 0)
lease.append(data, false);
} }
} }

View File

@ -80,8 +80,8 @@ public class Generator
generators[frame.getType().getType()].generate(lease, frame); generators[frame.getType().getType()].generate(lease, frame);
} }
public void data(ByteBufferPool.Lease lease, DataFrame frame, int maxLength) public int data(ByteBufferPool.Lease lease, DataFrame frame, int maxLength)
{ {
dataGenerator.generate(lease, frame, maxLength); return dataGenerator.generate(lease, frame, maxLength);
} }
} }

View File

@ -103,7 +103,14 @@ public class DataGenerateParseTest
for (int i = 0; i < 2; ++i) for (int i = 0; i < 2; ++i)
{ {
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool); ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.generateData(lease, 13, data.slice(), true, data.remaining()); ByteBuffer slice = data.slice();
int generated = 0;
while (true)
{
generated += generator.generateData(lease, 13, slice, true, slice.remaining());
if (generated == data.remaining())
break;
}
frames.clear(); frames.clear();
for (ByteBuffer buffer : lease.getByteBuffers()) for (ByteBuffer buffer : lease.getByteBuffers())
@ -135,7 +142,14 @@ public class DataGenerateParseTest
{ {
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool); ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
ByteBuffer data = ByteBuffer.wrap(largeContent); ByteBuffer data = ByteBuffer.wrap(largeContent);
generator.generateData(lease, 13, data.slice(), true, data.remaining()); ByteBuffer slice = data.slice();
int generated = 0;
while (true)
{
generated += generator.generateData(lease, 13, slice, true, slice.remaining());
if (generated == data.remaining())
break;
}
frames.clear(); frames.clear();
for (ByteBuffer buffer : lease.getByteBuffers()) for (ByteBuffer buffer : lease.getByteBuffers())