441020 - Support HEADERS followed by CONTINUATION+.

This commit is contained in:
Simone Bordet 2015-07-08 18:55:47 +02:00
parent 22cea067d7
commit c367ea8a85
11 changed files with 496 additions and 98 deletions

View File

@ -63,7 +63,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
@SuppressWarnings("unchecked")
Promise<Session> promise = (Promise<Session>)context.get(SESSION_PROMISE_CONTEXT_KEY);
Generator generator = new Generator(byteBufferPool, 4096);
Generator generator = new Generator(byteBufferPool);
FlowControlStrategy flowControl = newFlowControlStrategy();
HTTP2ClientSession session = new HTTP2ClientSession(scheduler, endPoint, generator, listener, flowControl);
Parser parser = new Parser(byteBufferPool, session, 4096, 8192);

View File

@ -34,18 +34,18 @@ public class Generator
public Generator(ByteBufferPool byteBufferPool)
{
this(byteBufferPool, 4096);
this(byteBufferPool, 4096, 0);
}
public Generator(ByteBufferPool byteBufferPool, int headerTableSize)
public Generator(ByteBufferPool byteBufferPool, int maxDynamicTableSize, int maxHeaderBlockFragment)
{
this.byteBufferPool = byteBufferPool;
headerGenerator = new HeaderGenerator();
hpackEncoder = new HpackEncoder(headerTableSize);
hpackEncoder = new HpackEncoder(maxDynamicTableSize);
this.generators = new FrameGenerator[FrameType.values().length];
this.generators[FrameType.HEADERS.getType()] = new HeadersGenerator(headerGenerator, hpackEncoder);
this.generators[FrameType.HEADERS.getType()] = new HeadersGenerator(headerGenerator, hpackEncoder, maxHeaderBlockFragment);
this.generators[FrameType.PRIORITY.getType()] = new PriorityGenerator(headerGenerator);
this.generators[FrameType.RST_STREAM.getType()] = new ResetGenerator(headerGenerator);
this.generators[FrameType.SETTINGS.getType()] = new SettingsGenerator(headerGenerator);
@ -53,7 +53,7 @@ public class Generator
this.generators[FrameType.PING.getType()] = new PingGenerator(headerGenerator);
this.generators[FrameType.GO_AWAY.getType()] = new GoAwayGenerator(headerGenerator);
this.generators[FrameType.WINDOW_UPDATE.getType()] = new WindowUpdateGenerator(headerGenerator);
this.generators[FrameType.CONTINUATION.getType()] = null; // TODO
this.generators[FrameType.CONTINUATION.getType()] = null; // Never generated explicitly.
this.generators[FrameType.PREFACE.getType()] = new PrefaceGenerator();
this.generators[FrameType.DISCONNECT.getType()] = new DisconnectGenerator();

View File

@ -32,11 +32,18 @@ import org.eclipse.jetty.util.BufferUtil;
public class HeadersGenerator extends FrameGenerator
{
private final HpackEncoder encoder;
private final int maxHeaderBlockFragment;
public HeadersGenerator(HeaderGenerator headerGenerator, HpackEncoder encoder)
{
this(headerGenerator, encoder, 0);
}
public HeadersGenerator(HeaderGenerator headerGenerator, HpackEncoder encoder, int maxHeaderBlockFragment)
{
super(headerGenerator);
this.encoder = encoder;
this.maxHeaderBlockFragment = maxHeaderBlockFragment;
}
@Override
@ -53,22 +60,50 @@ public class HeadersGenerator extends FrameGenerator
int maxFrameSize = getMaxFrameSize();
// The lease may already contain other buffers,
// compute the bytes generated by the encoder only.
int leaseSize = lease.getSize();
long leaseLength = lease.getTotalLength();
encoder.encode(metaData, lease, maxFrameSize);
long headersLength = lease.getTotalLength() - leaseLength;
if (headersLength > maxFrameSize)
throw new IllegalArgumentException(String.format("Invalid headers, too big for max frame size: %d > %d", headersLength, maxFrameSize));
ByteBuffer hpacked = lease.acquire(maxFrameSize, false);
BufferUtil.clearToFill(hpacked);
encoder.encode(hpacked, metaData);
int hpackedLength = hpacked.position();
BufferUtil.flipToFlush(hpacked, 0);
int flags = Flags.END_HEADERS;
if (!contentFollows)
flags |= Flags.END_STREAM;
// Split into CONTINUATION frames if necessary.
if (maxHeaderBlockFragment > 0 && hpackedLength > maxHeaderBlockFragment)
{
int flags = contentFollows ? Flags.NONE : Flags.END_STREAM;
ByteBuffer header = generateHeader(lease, FrameType.HEADERS, maxHeaderBlockFragment, flags, streamId);
BufferUtil.flipToFlush(header, 0);
lease.append(header, true);
hpacked.limit(maxHeaderBlockFragment);
lease.append(hpacked.slice(), false);
ByteBuffer header = generateHeader(lease, FrameType.HEADERS, (int)headersLength, flags, streamId);
int position = maxHeaderBlockFragment;
int limit = position + maxHeaderBlockFragment;
while (limit < hpackedLength)
{
hpacked.position(position).limit(limit);
header = generateHeader(lease, FrameType.CONTINUATION, maxHeaderBlockFragment, Flags.NONE, streamId);
BufferUtil.flipToFlush(header, 0);
lease.append(header, true);
lease.append(hpacked.slice(), false);
position += maxHeaderBlockFragment;
limit += maxHeaderBlockFragment;
}
BufferUtil.flipToFlush(header, 0);
lease.insert(leaseSize, header, true);
hpacked.position(position).limit(hpackedLength);
header = generateHeader(lease, FrameType.CONTINUATION, hpacked.remaining(), Flags.END_HEADERS, streamId);
BufferUtil.flipToFlush(header, 0);
lease.append(header, true);
lease.append(hpacked, true);
}
else
{
int flags = Flags.END_HEADERS;
if (!contentFollows)
flags |= Flags.END_STREAM;
ByteBuffer header = generateHeader(lease, FrameType.HEADERS, hpackedLength, flags, streamId);
BufferUtil.flipToFlush(header, 0);
lease.append(header, true);
lease.append(hpacked, true);
}
}
}

View File

@ -54,28 +54,24 @@ public class PushPromiseGenerator extends FrameGenerator
throw new IllegalArgumentException("Invalid promised stream id: " + promisedStreamId);
int maxFrameSize = getMaxFrameSize();
// The promised streamId.
int fixedLength = 4;
maxFrameSize -= fixedLength;
// The promised streamId space.
int extraSpace = 4;
maxFrameSize -= extraSpace;
// The lease may already contain other buffers,
// compute the bytes generated by the encoder only.
int leaseSize = lease.getSize();
long leaseLength = lease.getTotalLength();
encoder.encode(metaData, lease, maxFrameSize);
long headersLength = lease.getTotalLength() - leaseLength;
if (headersLength > maxFrameSize)
throw new IllegalArgumentException(String.format("Invalid headers, too big for max frame size: %d > %d", headersLength, maxFrameSize));
// Space for the promised streamId.
headersLength += fixedLength;
ByteBuffer hpacked = lease.acquire(maxFrameSize, false);
BufferUtil.clearToFill(hpacked);
encoder.encode(hpacked, metaData);
int hpackedLength = hpacked.position();
BufferUtil.flipToFlush(hpacked, 0);
int length = hpackedLength + extraSpace;
int flags = Flags.END_HEADERS;
ByteBuffer header = generateHeader(lease, FrameType.PUSH_PROMISE, (int)headersLength, flags, streamId);
ByteBuffer header = generateHeader(lease, FrameType.PUSH_PROMISE, length, flags, streamId);
header.putInt(promisedStreamId);
BufferUtil.flipToFlush(header, 0);
lease.insert(leaseSize, header, true);
lease.append(header, true);
lease.append(hpacked, true);
}
}

View File

@ -21,22 +21,95 @@ package org.eclipse.jetty.http2.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.Flags;
import org.eclipse.jetty.http2.frames.HeadersFrame;
public class ContinuationBodyParser extends BodyParser
{
private final HeaderBlockParser headerBlockParser;
private final HeaderBlockFragments headerBlockFragments;
private State state = State.PREPARE;
private int length;
public ContinuationBodyParser(HeaderParser headerParser, Parser.Listener listener, HeaderBlockParser headerBlockParser)
public ContinuationBodyParser(HeaderParser headerParser, Parser.Listener listener, HeaderBlockParser headerBlockParser, HeaderBlockFragments headerBlockFragments)
{
super(headerParser, listener);
this.headerBlockParser = headerBlockParser;
this.headerBlockFragments = headerBlockFragments;
}
@Override
protected void emptyBody(ByteBuffer buffer)
{
reset();
if (hasFlag(Flags.END_HEADERS))
onHeaders();
}
@Override
public boolean parse(ByteBuffer buffer)
{
MetaData metaData = headerBlockParser.parse(buffer, getBodyLength());
// TODO: CONTINUATION frames are not supported for now, we just parse them to keep HPACK happy.
return metaData != null;
while (buffer.hasRemaining())
{
switch (state)
{
case PREPARE:
{
// SPEC: wrong streamId is treated as connection error.
if (getStreamId() == 0)
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_continuation_frame");
if (getStreamId() != headerBlockFragments.getStreamId())
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_continuation_stream");
length = getBodyLength();
state = State.FRAGMENT;
break;
}
case FRAGMENT:
{
int remaining = buffer.remaining();
if (remaining < length)
{
headerBlockFragments.storeFragment(buffer, remaining, false);
length -= remaining;
}
else
{
boolean last = hasFlag(Flags.END_HEADERS);
headerBlockFragments.storeFragment(buffer, length, last);
reset();
if (last)
onHeaders();
return true;
}
}
default:
{
throw new IllegalStateException();
}
}
}
return false;
}
private void onHeaders()
{
ByteBuffer headerBlock = headerBlockFragments.complete();
MetaData metaData = headerBlockParser.parse(headerBlock, headerBlock.remaining());
HeadersFrame frame = new HeadersFrame(getStreamId(), metaData, headerBlockFragments.getPriorityFrame(), headerBlockFragments.isEndStream());
notifyHeaders(frame);
}
private void reset()
{
state = State.PREPARE;
length = 0;
}
private enum State
{
PREPARE, FRAGMENT
}
}

View File

@ -0,0 +1,95 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.frames.PriorityFrame;
public class HeaderBlockFragments
{
private PriorityFrame priorityFrame;
private boolean endStream;
private int streamId;
private ByteBuffer storage;
public void storeFragment(ByteBuffer fragment, int length, boolean last)
{
if (storage == null)
{
int space = last ? length : length * 2;
storage = ByteBuffer.allocate(space);
}
// Grow the storage if necessary.
if (storage.remaining() < length)
{
int space = last ? length : length * 2;
int capacity = storage.position() + space;
ByteBuffer newStorage = ByteBuffer.allocate(capacity);
storage.flip();
newStorage.put(storage);
storage = newStorage;
}
// Copy the fragment into the storage.
int limit = fragment.limit();
fragment.limit(fragment.position() + length);
storage.put(fragment);
fragment.limit(limit);
}
public PriorityFrame getPriorityFrame()
{
return priorityFrame;
}
public void setPriorityFrame(PriorityFrame priorityFrame)
{
this.priorityFrame = priorityFrame;
}
public boolean isEndStream()
{
return endStream;
}
public void setEndStream(boolean endStream)
{
this.endStream = endStream;
}
public ByteBuffer complete()
{
ByteBuffer result = storage;
storage = null;
result.flip();
return result;
}
public int getStreamId()
{
return streamId;
}
public void setStreamId(int streamId)
{
this.streamId = streamId;
}
}

View File

@ -30,6 +30,7 @@ import org.eclipse.jetty.util.BufferUtil;
public class HeadersBodyParser extends BodyParser
{
private final HeaderBlockParser headerBlockParser;
private final HeaderBlockFragments headerBlockFragments;
private State state = State.PREPARE;
private int cursor;
private int length;
@ -38,10 +39,11 @@ public class HeadersBodyParser extends BodyParser
private int streamId;
private int weight;
public HeadersBodyParser(HeaderParser headerParser, Parser.Listener listener, HeaderBlockParser headerBlockParser)
public HeadersBodyParser(HeaderParser headerParser, Parser.Listener listener, HeaderBlockParser headerBlockParser, HeaderBlockFragments headerBlockFragments)
{
super(headerParser, listener);
this.headerBlockParser = headerBlockParser;
this.headerBlockFragments = headerBlockFragments;
}
private void reset()
@ -58,8 +60,18 @@ public class HeadersBodyParser extends BodyParser
@Override
protected void emptyBody(ByteBuffer buffer)
{
MetaData metaData = headerBlockParser.parse(BufferUtil.EMPTY_BUFFER, 0);
onHeaders(0, 0, false, metaData);
if (hasFlag(Flags.END_HEADERS))
{
MetaData metaData = headerBlockParser.parse(BufferUtil.EMPTY_BUFFER, 0);
onHeaders(0, 0, false, metaData);
}
else
{
headerBlockFragments.setStreamId(getStreamId());
headerBlockFragments.setEndStream(isEndStream());
if (hasFlag(Flags.PRIORITY))
headerBlockFragments.setPriorityFrame(new PriorityFrame(streamId, getStreamId(), weight, exclusive));
}
reset();
}
@ -77,10 +89,6 @@ public class HeadersBodyParser extends BodyParser
if (getStreamId() == 0)
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_headers_frame");
// For now we don't support HEADERS frames that don't have END_HEADERS.
if (!hasFlag(Flags.END_HEADERS))
return connectionFailure(buffer, ErrorCode.INTERNAL_ERROR.code, "unsupported_headers_frame");
length = getBodyLength();
if (isPadding())
@ -162,12 +170,34 @@ public class HeadersBodyParser extends BodyParser
}
case HEADERS:
{
MetaData metaData = headerBlockParser.parse(buffer, length);
if (metaData != null)
if (hasFlag(Flags.END_HEADERS))
{
state = State.PADDING;
loop = paddingLength == 0;
onHeaders(streamId, weight, exclusive, metaData);
MetaData metaData = headerBlockParser.parse(buffer, length);
if (metaData != null)
{
state = State.PADDING;
loop = paddingLength == 0;
onHeaders(streamId, weight, exclusive, metaData);
}
}
else
{
int remaining = buffer.remaining();
if (remaining < length)
{
headerBlockFragments.storeFragment(buffer, remaining, false);
length -= remaining;
}
else
{
headerBlockFragments.setStreamId(getStreamId());
headerBlockFragments.setEndStream(isEndStream());
if (hasFlag(Flags.PRIORITY))
headerBlockFragments.setPriorityFrame(new PriorityFrame(streamId, getStreamId(), weight, exclusive));
headerBlockFragments.storeFragment(buffer, length, false);
state = State.PADDING;
loop = paddingLength == 0;
}
}
break;
}

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.http2.parser;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.Flags;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
@ -49,6 +50,7 @@ public class Parser
private final Listener listener;
private final HeaderParser headerParser;
private final BodyParser[] bodyParsers;
private boolean continuation;
private State state = State.HEADER;
public Parser(ByteBufferPool byteBufferPool, Listener listener, int maxDynamicTableSize, int maxHeaderSize)
@ -58,9 +60,10 @@ public class Parser
this.bodyParsers = new BodyParser[FrameType.values().length];
HeaderBlockParser headerBlockParser = new HeaderBlockParser(byteBufferPool, new HpackDecoder(maxDynamicTableSize, maxHeaderSize));
HeaderBlockFragments headerBlockFragments = new HeaderBlockFragments();
bodyParsers[FrameType.DATA.getType()] = new DataBodyParser(headerParser, listener);
bodyParsers[FrameType.HEADERS.getType()] = new HeadersBodyParser(headerParser, listener, headerBlockParser);
bodyParsers[FrameType.HEADERS.getType()] = new HeadersBodyParser(headerParser, listener, headerBlockParser, headerBlockFragments);
bodyParsers[FrameType.PRIORITY.getType()] = new PriorityBodyParser(headerParser, listener);
bodyParsers[FrameType.RST_STREAM.getType()] = new ResetBodyParser(headerParser, listener);
bodyParsers[FrameType.SETTINGS.getType()] = new SettingsBodyParser(headerParser, listener);
@ -68,7 +71,7 @@ public class Parser
bodyParsers[FrameType.PING.getType()] = new PingBodyParser(headerParser, listener);
bodyParsers[FrameType.GO_AWAY.getType()] = new GoAwayBodyParser(headerParser, listener);
bodyParsers[FrameType.WINDOW_UPDATE.getType()] = new WindowUpdateBodyParser(headerParser, listener);
bodyParsers[FrameType.CONTINUATION.getType()] = new ContinuationBodyParser(headerParser, listener, headerBlockParser);
bodyParsers[FrameType.CONTINUATION.getType()] = new ContinuationBodyParser(headerParser, listener, headerBlockParser, headerBlockFragments);
}
private void reset()
@ -100,6 +103,29 @@ public class Parser
{
if (!headerParser.parse(buffer))
return;
if (continuation)
{
if (headerParser.getFrameType() != FrameType.CONTINUATION.getType())
{
// SPEC: CONTINUATION frames must be consecutive.
BufferUtil.clear(buffer);
notifyConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "continuation_frame_expected");
return;
}
if (headerParser.hasFlag(Flags.END_HEADERS))
{
continuation = false;
}
}
else
{
if (headerParser.getFrameType() == FrameType.HEADERS.getType() &&
!headerParser.hasFlag(Flags.END_HEADERS))
{
continuation = true;
}
}
state = State.BODY;
break;
}

View File

@ -31,30 +31,28 @@ import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.http2.hpack.HpackContext.Entry;
import org.eclipse.jetty.http2.hpack.HpackContext.StaticEntry;
import org.eclipse.jetty.io.ByteBufferPool.Lease;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HpackEncoder
{
{
public static final Logger LOG = Log.getLogger(HpackEncoder.class);
private final static HttpField[] __status= new HttpField[599];
final static EnumSet<HttpHeader> __DO_NOT_HUFFMAN =
final static EnumSet<HttpHeader> __DO_NOT_HUFFMAN =
EnumSet.of(
HttpHeader.AUTHORIZATION,
HttpHeader.CONTENT_MD5,
HttpHeader.PROXY_AUTHENTICATE,
HttpHeader.PROXY_AUTHORIZATION);
final static EnumSet<HttpHeader> __DO_NOT_INDEX =
final static EnumSet<HttpHeader> __DO_NOT_INDEX =
EnumSet.of(
// HttpHeader.C_PATH, // TODO more data needed
// HttpHeader.DATE, // TODO more data needed
// HttpHeader.C_PATH, // TODO more data needed
// HttpHeader.DATE, // TODO more data needed
HttpHeader.AUTHORIZATION,
HttpHeader.CONTENT_MD5,
HttpHeader.CONTENT_RANGE,
@ -71,35 +69,35 @@ public class HpackEncoder
HttpHeader.LAST_MODIFIED,
HttpHeader.SET_COOKIE,
HttpHeader.SET_COOKIE2);
final static EnumSet<HttpHeader> __NEVER_INDEX =
final static EnumSet<HttpHeader> __NEVER_INDEX =
EnumSet.of(
HttpHeader.AUTHORIZATION,
HttpHeader.SET_COOKIE,
HttpHeader.SET_COOKIE2);
static
{
for (HttpStatus.Code code : HttpStatus.Code.values())
__status[code.getCode()]=new PreEncodedHttpField(HttpHeader.C_STATUS,Integer.toString(code.getCode()));
}
private final HpackContext _context;
private final boolean _debug;
private int _remoteMaxDynamicTableSize;
private int _localMaxDynamicTableSize;
public HpackEncoder()
{
this(4096,4096);
}
public HpackEncoder(int localMaxDynamicTableSize)
{
this(localMaxDynamicTableSize,4096);
}
public HpackEncoder(int localMaxDynamicTableSize,int remoteMaxDynamicTableSize)
{
_context=new HpackContext(remoteMaxDynamicTableSize);
@ -112,27 +110,17 @@ public class HpackEncoder
{
return _context;
}
public void setRemoteMaxDynamicTableSize(int remoteMaxDynamicTableSize)
{
_remoteMaxDynamicTableSize=remoteMaxDynamicTableSize;
}
public void setLocalMaxDynamicTableSize(int localMaxDynamicTableSize)
{
_localMaxDynamicTableSize=localMaxDynamicTableSize;
}
// TODO better handling of buffer size
public void encode(MetaData metadata,Lease lease,int buffersize)
{
ByteBuffer buffer = lease.acquire(buffersize,false);
lease.append(buffer,true);
BufferUtil.clearToFill(buffer);
encode(buffer,metadata);
BufferUtil.flipToFlush(buffer,0);
}
public void encode(ByteBuffer buffer, MetaData metadata)
{
if (LOG.isDebugEnabled())
@ -188,7 +176,7 @@ public class HpackEncoder
public void encode(ByteBuffer buffer, HttpField field)
{
final int p=_debug?buffer.position():-1;
String encoding=null;
// Is there an entry for the field?
@ -215,18 +203,18 @@ public class HpackEncoder
{
// Unknown field entry, so we will have to send literally.
final boolean indexed;
// But do we know it's name?
HttpHeader header = field.getHeader();
// Select encoding strategy
if (header==null)
{
// Select encoding strategy for unknown header names
Entry name = _context.get(field.getName());
if (field instanceof PreEncodedHttpField)
{
{
int i=buffer.position();
((PreEncodedHttpField)field).putTo(buffer,HttpVersion.HTTP_2);
byte b=buffer.get(i);
@ -237,7 +225,7 @@ public class HpackEncoder
// has the custom header name been seen before?
else if (name==null)
{
// unknown name and value, so let's index this just in case it is
// unknown name and value, so let's index this just in case it is
// the first time we have seen a custom name or a custom field.
// unless the name is changing, this is worthwhile
indexed=true;
@ -257,13 +245,13 @@ public class HpackEncoder
encoding="LitHuffNHuffV!Idx";
}
}
else
else
{
// Select encoding strategy for known header names
Entry name = _context.get(header);
if (field instanceof PreEncodedHttpField)
{
{
// Preencoded field
int i=buffer.position();
((PreEncodedHttpField)field).putTo(buffer,HttpVersion.HTTP_2);
@ -320,9 +308,9 @@ public class HpackEncoder
int e=buffer.position();
if (LOG.isDebugEnabled())
LOG.debug("encode {}:'{}' to '{}'",encoding,field,TypeUtil.toHexString(buffer.array(),buffer.arrayOffset()+p,e-p));
}
}
}
private void encodeName(ByteBuffer buffer, byte mask, int bits, String name, Entry entry)
{
buffer.put(mask);
@ -339,7 +327,7 @@ public class HpackEncoder
NBitInteger.encode(buffer,bits,_context.index(entry));
}
}
static void encodeValue(ByteBuffer buffer, boolean huffman, String value)
{
if (huffman)

View File

@ -38,6 +38,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
private int maxDynamicTableSize = 4096;
private int initialStreamSendWindow = FlowControlStrategy.DEFAULT_WINDOW_SIZE;
private int maxConcurrentStreams = -1;
private int maxHeaderBlockFragment = 0;
private final HttpConfiguration httpConfiguration;
public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration)
@ -81,6 +82,16 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
this.maxConcurrentStreams = maxConcurrentStreams;
}
public int getMaxHeaderBlockFragment()
{
return maxHeaderBlockFragment;
}
public void setMaxHeaderBlockFragment(int maxHeaderBlockFragment)
{
this.maxHeaderBlockFragment = maxHeaderBlockFragment;
}
public HttpConfiguration getHttpConfiguration()
{
return httpConfiguration;
@ -91,7 +102,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
{
ServerSessionListener listener = newSessionListener(connector, endPoint);
Generator generator = new Generator(connector.getByteBufferPool(), getMaxDynamicTableSize());
Generator generator = new Generator(connector.getByteBufferPool(), getMaxDynamicTableSize(), getMaxHeaderBlockFragment());
FlowControlStrategy flowControl = newFlowControlStrategy();
HTTP2ServerSession session = new HTTP2ServerSession(connector.getScheduler(), endPoint, generator, listener, flowControl);
session.setMaxLocalStreams(getMaxConcurrentStreams());

View File

@ -26,10 +26,13 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@ -38,12 +41,15 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.Flags;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PingFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ManagedSelector;
@ -390,4 +396,142 @@ public class HTTP2ServerTest extends AbstractServerTest
logger.setHideStacks(false);
}
}
@Test
public void testRequestWithContinuationFrames() throws Exception
{
testRequestWithContinuationFrames(new Callable<ByteBufferPool.Lease>()
{
@Override
public ByteBufferPool.Lease call() throws Exception
{
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, new PrefaceFrame());
MetaData.Request metaData = newRequest("GET", new HttpFields());
generator.control(lease, new HeadersFrame(1, metaData, null, true));
return lease;
}
});
}
@Test
public void testRequestWithContinuationFramesWithEmptyHeadersFrame() throws Exception
{
testRequestWithContinuationFrames(new Callable<ByteBufferPool.Lease>()
{
@Override
public ByteBufferPool.Lease call() throws Exception
{
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, new PrefaceFrame());
MetaData.Request metaData = newRequest("GET", new HttpFields());
generator.control(lease, new HeadersFrame(1, metaData, null, true));
// Take the HeadersFrame header and set the length to zero.
List<ByteBuffer> buffers = lease.getByteBuffers();
ByteBuffer headersFrameHeader = buffers.get(1);
headersFrameHeader.put(0, (byte)0);
headersFrameHeader.putShort(1, (short)0);
// Insert a CONTINUATION frame header for the body of the HEADERS frame.
lease.insert(2, buffers.get(3).slice(), false);
return lease;
}
});
}
@Test
public void testRequestWithContinuationFramesWithEmptyContinuationFrame() throws Exception
{
testRequestWithContinuationFrames(new Callable<ByteBufferPool.Lease>()
{
@Override
public ByteBufferPool.Lease call() throws Exception
{
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, new PrefaceFrame());
MetaData.Request metaData = newRequest("GET", new HttpFields());
generator.control(lease, new HeadersFrame(1, metaData, null, true));
// Take the ContinuationFrame header, duplicate it, and set the length to zero.
List<ByteBuffer> buffers = lease.getByteBuffers();
ByteBuffer continuationFrameHeader = buffers.get(3);
ByteBuffer duplicate = ByteBuffer.allocate(continuationFrameHeader.remaining());
duplicate.put(continuationFrameHeader).flip();
continuationFrameHeader.flip();
continuationFrameHeader.put(0, (byte)0);
continuationFrameHeader.putShort(1, (short)0);
// Insert a CONTINUATION frame header for the body of the previous CONTINUATION frame.
lease.insert(4, duplicate, false);
return lease;
}
});
}
@Test
public void testRequestWithContinuationFramesWithEmptyLastContinuationFrame() throws Exception
{
testRequestWithContinuationFrames(new Callable<ByteBufferPool.Lease>()
{
@Override
public ByteBufferPool.Lease call() throws Exception
{
ByteBufferPool.Lease lease = new ByteBufferPool.Lease(byteBufferPool);
generator.control(lease, new PrefaceFrame());
MetaData.Request metaData = newRequest("GET", new HttpFields());
generator.control(lease, new HeadersFrame(1, metaData, null, true));
// Take the last CONTINUATION frame and reset the flag.
List<ByteBuffer> buffers = lease.getByteBuffers();
ByteBuffer continuationFrameHeader = buffers.get(buffers.size() - 2);
continuationFrameHeader.put(4, (byte)0);
// Add a last, empty, CONTINUATION frame.
ByteBuffer last = ByteBuffer.wrap(new byte[]{
0, 0, 0, // Length
(byte)FrameType.CONTINUATION.getType(),
(byte)Flags.END_HEADERS,
0, 0, 0, 1 // Stream ID
});
lease.append(last, false);
return lease;
}
});
}
private void testRequestWithContinuationFrames(Callable<ByteBufferPool.Lease> frames) throws Exception
{
final CountDownLatch serverLatch = new CountDownLatch(1);
startServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
serverLatch.countDown();
}
});
generator = new Generator(byteBufferPool, 4096, 4);
ByteBufferPool.Lease lease = frames.call();
try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
OutputStream output = client.getOutputStream();
for (ByteBuffer buffer : lease.getByteBuffers())
output.write(BufferUtil.toArray(buffer));
output.flush();
Assert.assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
final CountDownLatch clientLatch = new CountDownLatch(1);
Parser parser = new Parser(byteBufferPool, new Parser.Listener.Adapter()
{
@Override
public void onHeaders(HeadersFrame frame)
{
if (frame.isEndStream())
clientLatch.countDown();
}
}, 4096, 8192);
boolean closed = parseResponse(client, parser);
Assert.assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
Assert.assertFalse(closed);
}
}
}