Merged branch 'jetty-9.3.x' into 'master'.

This commit is contained in:
Simone Bordet 2015-11-25 14:00:19 +01:00
commit 50041395f9
20 changed files with 397 additions and 231 deletions

View File

@ -73,7 +73,7 @@ public class MultiPartContentProvider extends AbstractTypedContentProvider imple
private final ByteBuffer onlyBoundary;
private final ByteBuffer lastBoundary;
private Listener listener;
private long length;
private long length=-1;
public MultiPartContentProvider()
{
@ -151,6 +151,7 @@ public class MultiPartContentProvider extends AbstractTypedContentProvider imple
private void addPart(Part part)
{
parts.add(part);
length=-1;
if (LOG.isDebugEnabled())
LOG.debug("Added {}", part);
}
@ -159,7 +160,11 @@ public class MultiPartContentProvider extends AbstractTypedContentProvider imple
public void setListener(Listener listener)
{
this.listener = listener;
calculateLength();
}
private void calculateLength()
{
// Compute the length, if possible.
if (parts.isEmpty())
{
@ -187,6 +192,8 @@ public class MultiPartContentProvider extends AbstractTypedContentProvider imple
@Override
public long getLength()
{
if (length<=0)
calculateLength();
return length;
}

View File

@ -22,6 +22,8 @@ import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.eclipse.jetty.http.HttpTokens.EndOfContent;
import org.eclipse.jetty.util.BufferUtil;
@ -67,7 +69,7 @@ public class HttpGenerator
private final int _send;
private final static int SEND_SERVER = 0x01;
private final static int SEND_XPOWEREDBY = 0x02;
private final static Set<String> __assumedContentMethods = new HashSet<>(Arrays.asList(new String[]{HttpMethod.POST.asString(),HttpMethod.PUT.asString()}));
/* ------------------------------------------------------------------------------- */
public static void setJettyVersion(String serverVersion)
@ -206,7 +208,6 @@ public class HttpGenerator
if (info==null)
return Result.NEED_INFO;
// Do we need a request header
if (header==null)
return Result.NEED_HEADER;
@ -226,9 +227,9 @@ public class HttpGenerator
generateRequestLine(info,header);
if (info.getVersion()==HttpVersion.HTTP_0_9)
_noContent=true;
else
generateHeaders(info,header,content,last);
throw new IllegalArgumentException("HTTP/0.9 not supported");
generateHeaders(info,header,content,last);
boolean expect100 = info.getFields().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
@ -342,25 +343,26 @@ public class HttpGenerator
if (info==null)
return Result.NEED_INFO;
// Handle 0.9
if (info.getVersion() == HttpVersion.HTTP_0_9)
switch(info.getVersion())
{
_persistent = false;
_endOfContent=EndOfContent.EOF_CONTENT;
if (BufferUtil.hasContent(content))
_contentPrepared+=content.remaining();
_state = last?State.COMPLETING:State.COMMITTED;
return Result.FLUSH;
case HTTP_1_0:
if (_persistent==null)
_persistent=Boolean.FALSE;
break;
case HTTP_1_1:
if (_persistent==null)
_persistent=Boolean.TRUE;
break;
default:
throw new IllegalArgumentException(info.getVersion()+" not supported");
}
// Do we need a response header
if (header==null)
return Result.NEED_HEADER;
// If we have not been told our persistence, set the default
if (_persistent==null)
_persistent=(info.getVersion().ordinal() > HttpVersion.HTTP_1_0.ordinal());
// prepare the header
int pos=BufferUtil.flipToFill(header);
try
@ -513,16 +515,8 @@ public class HttpGenerator
header.put(StringUtil.getBytes(request.getMethod()));
header.put((byte)' ');
header.put(StringUtil.getBytes(request.getURIString()));
switch(request.getVersion())
{
case HTTP_1_0:
case HTTP_1_1:
header.put((byte)' ');
header.put(request.getVersion().toBytes());
break;
default:
throw new IllegalStateException();
}
header.put((byte)' ');
header.put(request.getVersion().toBytes());
header.put(HttpTokens.CRLF);
}
@ -589,126 +583,132 @@ public class HttpGenerator
boolean close=false;
boolean content_type=false;
StringBuilder connection = null;
long content_length = _info.getContentLength();
// Generate fields
if (_info.getFields() != null)
HttpFields fields = _info.getFields();
if (fields != null)
{
for (HttpField field : _info.getFields())
int n=fields.size();
for (int f=0;f<n;f++)
{
HttpField field = fields.getField(f);
String v = field.getValue();
if (v==null || v.length()==0)
continue; // rfc7230 does not allow no value
HttpHeader h = field.getHeader();
switch (h==null?HttpHeader.UNKNOWN:h)
if (h==null)
putTo(field,header);
else
{
case CONTENT_LENGTH:
// handle specially below
if (_info.getContentLength()>=0)
switch (h)
{
case CONTENT_LENGTH:
_endOfContent=EndOfContent.CONTENT_LENGTH;
break;
if (content_length<0)
content_length=Long.valueOf(field.getValue());
// handle setting the field specially below
break;
case CONTENT_TYPE:
{
if (field.getValue().startsWith(MimeTypes.Type.MULTIPART_BYTERANGES.toString()))
_endOfContent=EndOfContent.SELF_DEFINING_CONTENT;
// write the field to the header
content_type=true;
putTo(field,header);
break;
}
case TRANSFER_ENCODING:
{
if (_info.getVersion() == HttpVersion.HTTP_1_1)
transfer_encoding = field;
// Do NOT add yet!
break;
}
case CONNECTION:
{
if (request!=null)
case CONTENT_TYPE:
{
// write the field to the header
content_type=true;
putTo(field,header);
// Lookup and/or split connection value field
HttpHeaderValue[] values = HttpHeaderValue.CLOSE.is(field.getValue())?CLOSE:new HttpHeaderValue[]{HttpHeaderValue.CACHE.get(field.getValue())};
String[] split = null;
if (values[0]==null)
{
split = StringUtil.csvSplit(field.getValue());
if (split.length>0)
{
values=new HttpHeaderValue[split.length];
for (int i=0;i<split.length;i++)
values[i]=HttpHeaderValue.CACHE.get(split[i]);
}
break;
}
// Handle connection values
for (int i=0;i<values.length;i++)
case TRANSFER_ENCODING:
{
HttpHeaderValue value=values[i];
switch (value==null?HttpHeaderValue.UNKNOWN:value)
if (_info.getVersion() == HttpVersion.HTTP_1_1)
transfer_encoding = field;
// Do NOT add yet!
break;
}
case CONNECTION:
{
if (request!=null)
putTo(field,header);
// Lookup and/or split connection value field
HttpHeaderValue[] values = HttpHeaderValue.CLOSE.is(field.getValue())?CLOSE:new HttpHeaderValue[]{HttpHeaderValue.CACHE.get(field.getValue())};
String[] split = null;
if (values[0]==null)
{
case UPGRADE:
split = StringUtil.csvSplit(field.getValue());
if (split.length>0)
{
// special case for websocket connection ordering
header.put(HttpHeader.CONNECTION.getBytesColonSpace()).put(HttpHeader.UPGRADE.getBytes());
header.put(CRLF);
break;
values=new HttpHeaderValue[split.length];
for (int i=0;i<split.length;i++)
values[i]=HttpHeaderValue.CACHE.get(split[i]);
}
}
case CLOSE:
// Handle connection values
for (int i=0;i<values.length;i++)
{
HttpHeaderValue value=values[i];
switch (value==null?HttpHeaderValue.UNKNOWN:value)
{
close=true;
_persistent=false;
if (response!=null)
case UPGRADE:
{
if (_endOfContent == EndOfContent.UNKNOWN_CONTENT)
_endOfContent=EndOfContent.EOF_CONTENT;
// special case for websocket connection ordering
header.put(HttpHeader.CONNECTION.getBytesColonSpace()).put(HttpHeader.UPGRADE.getBytes());
header.put(CRLF);
break;
}
break;
}
case KEEP_ALIVE:
{
if (_info.getVersion() == HttpVersion.HTTP_1_0)
case CLOSE:
{
keep_alive = true;
close=true;
_persistent=false;
if (response!=null)
_persistent=true;
{
if (_endOfContent == EndOfContent.UNKNOWN_CONTENT)
_endOfContent=EndOfContent.EOF_CONTENT;
}
break;
}
break;
}
default:
{
if (connection==null)
connection=new StringBuilder();
else
connection.append(',');
connection.append(split==null?field.getValue():split[i]);
case KEEP_ALIVE:
{
if (_info.getVersion() == HttpVersion.HTTP_1_0)
{
keep_alive = true;
if (response!=null)
_persistent=true;
}
break;
}
default:
{
if (connection==null)
connection=new StringBuilder();
else
connection.append(',');
connection.append(split==null?field.getValue():split[i]);
}
}
}
// Do NOT add yet!
break;
}
// Do NOT add yet!
break;
}
case SERVER:
{
send=send&~SEND_SERVER;
putTo(field,header);
break;
}
case SERVER:
{
send=send&~SEND_SERVER;
putTo(field,header);
break;
default:
putTo(field,header);
}
default:
putTo(field,header);
}
}
}
@ -716,13 +716,15 @@ public class HttpGenerator
// Calculate how to end _content and connection, _content length and transfer encoding
// settings.
// From http://tools.ietf.org/html/rfc7230#section-3.3.3
// From RFC 2616 4.4:
// 1. No body for 1xx, 204, 304 & HEAD response
// 2. Force _content-length?
// 3. If Transfer-Encoding!=identity && HTTP/1.1 && !HttpConnection==close then chunk
// 4. Content-Length
// 5. multipart/byteranges
// 6. close
// 3. If Transfer-Encoding==(.*,)?chunked && HTTP/1.1 && !HttpConnection==close then chunk
// 5. Content-Length without Transfer-Encoding
// 6. Request and none over the above, then Content-Length=0 if POST/PUT
// 7. close
int status=response!=null?response.getStatus():-1;
switch (_endOfContent)
{
@ -731,13 +733,12 @@ public class HttpGenerator
// written yet?
// Response known not to have a body
if (_contentPrepared == 0 && response!=null && (status < 200 || status == 204 || status == 304))
if (_contentPrepared == 0 && response!=null && _noContent)
_endOfContent=EndOfContent.NO_CONTENT;
else if (_info.getContentLength()>0)
{
// we have been given a content length
_endOfContent=EndOfContent.CONTENT_LENGTH;
long content_length = _info.getContentLength();
if ((response!=null || content_length>0 || content_type ) && !_noContent)
{
// known length but not actually set.
@ -750,15 +751,13 @@ public class HttpGenerator
{
// we have seen all the _content there is, so we can be content-length limited.
_endOfContent=EndOfContent.CONTENT_LENGTH;
long content_length = _contentPrepared+BufferUtil.length(content);
long actual_length = _contentPrepared+BufferUtil.length(content);
if (content_length>=0 && content_length!=actual_length)
throw new IllegalArgumentException("Content-Length header("+content_length+") != actual("+actual_length+")");
// Do we need to tell the headers about it
if ((response!=null || content_length>0 || content_type ) && !_noContent)
{
header.put(HttpHeader.CONTENT_LENGTH.getBytesColonSpace());
BufferUtil.putDecLong(header, content_length);
header.put(HttpTokens.CRLF);
}
putContentLength(header,actual_length,content_type,request,response);
}
else
{
@ -775,32 +774,12 @@ public class HttpGenerator
case CONTENT_LENGTH:
{
long content_length = _info.getContentLength();
if ((response!=null || content_length>0 || content_type ) && !_noContent)
{
header.put(HttpHeader.CONTENT_LENGTH.getBytesColonSpace());
BufferUtil.putDecLong(header, content_length);
header.put(HttpTokens.CRLF);
}
putContentLength(header,content_length,content_type,request,response);
break;
}
case SELF_DEFINING_CONTENT:
{
// TODO - Should we do this? Why was it not required before?
long content_length = _info.getContentLength();
if (content_length>0)
{
header.put(HttpHeader.CONTENT_LENGTH.getBytesColonSpace());
BufferUtil.putDecLong(header, content_length);
header.put(HttpTokens.CRLF);
}
break;
}
case NO_CONTENT:
if (response!=null && status >= 200 && status != 204 && status != 304)
header.put(CONTENT_LENGTH_0);
break;
throw new IllegalStateException();
case EOF_CONTENT:
_persistent = request!=null;
@ -878,6 +857,22 @@ public class HttpGenerator
header.put(HttpTokens.CRLF);
}
/* ------------------------------------------------------------------------------- */
private void putContentLength(ByteBuffer header, long contentLength, boolean contentType, MetaData.Request request, MetaData.Response response)
{
if (contentLength>0)
{
header.put(HttpHeader.CONTENT_LENGTH.getBytesColonSpace());
BufferUtil.putDecLong(header, contentLength);
header.put(HttpTokens.CRLF);
}
else if (!_noContent)
{
if (contentType || response!=null || (request!=null && __assumedContentMethods.contains(request.getMethod())))
header.put(CONTENT_LENGTH_0);
}
}
/* ------------------------------------------------------------------------------- */
public static byte[] getReasonBuffer(int code)
{

View File

@ -32,7 +32,7 @@ public interface HttpTokens
static final byte[] CRLF = {CARRIAGE_RETURN,LINE_FEED};
static final byte SEMI_COLON= (byte)';';
public enum EndOfContent { UNKNOWN_CONTENT,NO_CONTENT,EOF_CONTENT,CONTENT_LENGTH,CHUNKED_CONTENT,SELF_DEFINING_CONTENT }
public enum EndOfContent { UNKNOWN_CONTENT,NO_CONTENT,EOF_CONTENT,CONTENT_LENGTH,CHUNKED_CONTENT }
}

View File

@ -43,7 +43,7 @@ public class HttpGeneratorClientTest
}
@Test
public void testRequestNoContent() throws Exception
public void testGETRequestNoContent() throws Exception
{
ByteBuffer header=BufferUtil.allocate(2048);
HttpGenerator gen = new HttpGenerator();
@ -77,7 +77,43 @@ public class HttpGeneratorClientTest
Assert.assertEquals(0, gen.getContentPrepared());
Assert.assertThat(out, Matchers.containsString("GET /index.html HTTP/1.1"));
Assert.assertThat(out, Matchers.not(Matchers.containsString("Content-Length")));
}
@Test
public void testPOSTRequestNoContent() throws Exception
{
ByteBuffer header=BufferUtil.allocate(2048);
HttpGenerator gen = new HttpGenerator();
HttpGenerator.Result
result=gen.generateRequest(null,null,null,null, true);
Assert.assertEquals(HttpGenerator.Result.NEED_INFO, result);
Assert.assertEquals(HttpGenerator.State.START, gen.getState());
Info info = new Info("POST","/index.html");
info.getFields().add("Host","something");
info.getFields().add("User-Agent","test");
Assert.assertTrue(!gen.isChunking());
result=gen.generateRequest(info,null,null,null, true);
Assert.assertEquals(HttpGenerator.Result.NEED_HEADER, result);
Assert.assertEquals(HttpGenerator.State.START, gen.getState());
result=gen.generateRequest(info,header,null,null, true);
Assert.assertEquals(HttpGenerator.Result.FLUSH, result);
Assert.assertEquals(HttpGenerator.State.COMPLETING, gen.getState());
Assert.assertTrue(!gen.isChunking());
String out = BufferUtil.toString(header);
BufferUtil.clear(header);
result=gen.generateResponse(null,null,null,null, false);
Assert.assertEquals(HttpGenerator.Result.DONE, result);
Assert.assertEquals(HttpGenerator.State.END, gen.getState());
Assert.assertTrue(!gen.isChunking());
Assert.assertEquals(0, gen.getContentPrepared());
Assert.assertThat(out, Matchers.containsString("POST /index.html HTTP/1.1"));
Assert.assertThat(out, Matchers.containsString("Content-Length: 0"));
}
@Test

View File

@ -59,14 +59,6 @@ public class HttpGeneratorServerHTTPTest
String response = run.result.build(run.httpVersion, gen, "OK\r\nTest", run.connection.val, null, run.chunks);
if (run.httpVersion == 9)
{
assertFalse(t, gen.isPersistent());
if (run.result._body != null)
assertEquals(t, run.result._body, response);
return;
}
HttpParser parser = new HttpParser(handler);
parser.setHeadResponse(run.result._head);
@ -80,8 +72,7 @@ public class HttpGeneratorServerHTTPTest
else
assertTrue(t, gen.isPersistent() || EnumSet.of(ConnectionType.CLOSE, ConnectionType.TE_CLOSE).contains(run.connection));
if (run.httpVersion > 9)
assertEquals("OK??Test", _reason);
assertEquals("OK??Test", _reason);
if (_content == null)
assertTrue(t, run.result._body == null);
@ -346,7 +337,7 @@ public class HttpGeneratorServerHTTPTest
for (Result result : results)
{
// Loop over HTTP versions
for (int v = 9; v <= 11; v++)
for (int v = 10; v <= 11; v++)
{
// Loop over chunks
for (int chunks = 1; chunks <= 6; chunks++)

View File

@ -395,7 +395,7 @@ public class HttpGeneratorServerTest
assertEquals(HttpGenerator.Result.NEED_INFO, result);
assertEquals(HttpGenerator.State.START, gen.getState());
MetaData.Response info = new MetaData.Response(HttpVersion.HTTP_1_1, 200, null, new HttpFields(), 59);
MetaData.Response info = new MetaData.Response(HttpVersion.HTTP_1_1, 200, null, new HttpFields(), BufferUtil.length(content0)+BufferUtil.length(content1));
info.getFields().add("Last-Modified", DateGenerator.__01Jan1970);
result = gen.generateResponse(info, null, null, content0, false);
assertEquals(HttpGenerator.Result.NEED_HEADER, result);

View File

@ -22,6 +22,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -43,8 +45,10 @@ import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.Promise;
import org.junit.Assert;
@ -289,6 +293,131 @@ public class HTTP2Test extends AbstractTest
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testMaxConcurrentStreams() throws Exception
{
int maxStreams = 2;
start(new ServerSessionListener.Adapter()
{
@Override
public Map<Integer, Integer> onPreface(Session session)
{
Map<Integer, Integer> settings = new HashMap<>(1);
settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, maxStreams);
return settings;
}
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields(), 0);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
return null;
}
});
CountDownLatch settingsLatch = new CountDownLatch(1);
Session session = newClient(new Session.Listener.Adapter()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
{
settingsLatch.countDown();
}
});
Assert.assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
MetaData.Request request1 = newRequest("GET", new HttpFields());
FuturePromise<Stream> promise1 = new FuturePromise<>();
CountDownLatch exchangeLatch1 = new CountDownLatch(2);
session.newStream(new HeadersFrame(request1, null, false), promise1, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (frame.isEndStream())
exchangeLatch1.countDown();
}
});
Stream stream1 = promise1.get(5, TimeUnit.SECONDS);
MetaData.Request request2 = newRequest("GET", new HttpFields());
FuturePromise<Stream> promise2 = new FuturePromise<>();
CountDownLatch exchangeLatch2 = new CountDownLatch(2);
session.newStream(new HeadersFrame(request2, null, false), promise2, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (frame.isEndStream())
exchangeLatch2.countDown();
}
});
Stream stream2 = promise2.get(5, TimeUnit.SECONDS);
// The third stream must not be created.
MetaData.Request request3 = newRequest("GET", new HttpFields());
CountDownLatch maxStreamsLatch = new CountDownLatch(1);
session.newStream(new HeadersFrame(request3, null, false), new Promise.Adapter<Stream>()
{
@Override
public void failed(Throwable x)
{
if (x instanceof IllegalStateException)
maxStreamsLatch.countDown();
}
}, new Stream.Listener.Adapter());
Assert.assertTrue(maxStreamsLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(2, session.getStreams().size());
// End the second stream.
stream2.data(new DataFrame(stream2.getId(), BufferUtil.EMPTY_BUFFER, true), new Callback()
{
@Override
public void succeeded()
{
exchangeLatch2.countDown();
}
});
Assert.assertTrue(exchangeLatch2.await(5, TimeUnit.SECONDS));
Assert.assertEquals(1, session.getStreams().size());
// Create a fourth stream.
MetaData.Request request4 = newRequest("GET", new HttpFields());
CountDownLatch exchangeLatch4 = new CountDownLatch(2);
session.newStream(new HeadersFrame(request4, null, true), new Promise.Adapter<Stream>()
{
@Override
public void succeeded(Stream result)
{
exchangeLatch4.countDown();
}
}, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (frame.isEndStream())
exchangeLatch4.countDown();
}
});
Assert.assertTrue(exchangeLatch4.await(5, TimeUnit.SECONDS));
Assert.assertEquals(1, session.getStreams().size());
// End the first stream.
stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), new Callback()
{
@Override
public void succeeded()
{
exchangeLatch1.countDown();
}
});
Assert.assertTrue(exchangeLatch2.await(5, TimeUnit.SECONDS));
Assert.assertEquals(0, session.getStreams().size());
}
@Test
public void testInvalidAPIUsageOnClient() throws Exception
{
@ -383,19 +512,6 @@ public class HTTP2Test extends AbstractTest
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
}
private static void sleep(long time)
{
try
{
Thread.sleep(time);
}
catch (InterruptedException x)
{
throw new RuntimeException();
}
}
}
/*
@Test
public void testInvalidAPIUsageOnServer() throws Exception
{
@ -469,4 +585,15 @@ public class HTTP2Test extends AbstractTest
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
}
*/
private static void sleep(long time)
{
try
{
Thread.sleep(time);
}
catch (InterruptedException x)
{
throw new RuntimeException();
}
}
}

View File

@ -47,14 +47,14 @@ public abstract class AbstractFlowControlStrategy implements FlowControlStrategy
}
@Override
public void onStreamCreated(IStream stream, boolean local)
public void onStreamCreated(IStream stream)
{
stream.updateSendWindow(initialStreamSendWindow);
stream.updateRecvWindow(initialStreamRecvWindow);
}
@Override
public void onStreamDestroyed(IStream stream, boolean local)
public void onStreamDestroyed(IStream stream)
{
}

View File

@ -68,17 +68,17 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
}
@Override
public void onStreamCreated(IStream stream, boolean local)
public void onStreamCreated(IStream stream)
{
super.onStreamCreated(stream, local);
super.onStreamCreated(stream);
streamLevels.put(stream, new AtomicInteger());
}
@Override
public void onStreamDestroyed(IStream stream, boolean local)
public void onStreamDestroyed(IStream stream)
{
streamLevels.remove(stream);
super.onStreamDestroyed(stream, local);
super.onStreamDestroyed(stream);
}
@Override

View File

@ -24,9 +24,9 @@ public interface FlowControlStrategy
{
public static int DEFAULT_WINDOW_SIZE = 65535;
public void onStreamCreated(IStream stream, boolean local);
public void onStreamCreated(IStream stream);
public void onStreamDestroyed(IStream stream, boolean local);
public void onStreamDestroyed(IStream stream);
public void updateInitialStreamWindow(ISession session, int initialStreamWindow, boolean local);

View File

@ -366,7 +366,7 @@ public class HTTP2Flusher extends IteratingCallback
if (stream != null)
{
stream.close();
stream.getSession().removeStream(stream, true);
stream.getSession().removeStream(stream);
}
callback.failed(x);
}

View File

@ -618,11 +618,11 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
break;
}
IStream stream = newStream(streamId);
IStream stream = newStream(streamId, true);
if (streams.putIfAbsent(streamId, stream) == null)
{
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream, true);
flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created local {}", stream);
return stream;
@ -650,14 +650,14 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
break;
}
IStream stream = newStream(streamId);
IStream stream = newStream(streamId, false);
// SPEC: duplicate stream is treated as connection error.
if (streams.putIfAbsent(streamId, stream) == null)
{
updateLastStreamId(streamId);
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream, false);
flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created remote {}", stream);
return stream;
@ -669,28 +669,29 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
}
}
protected IStream newStream(int streamId)
protected IStream newStream(int streamId, boolean local)
{
return new HTTP2Stream(scheduler, this, streamId);
return new HTTP2Stream(scheduler, this, streamId, local);
}
@Override
public void removeStream(IStream stream, boolean local)
public void removeStream(IStream stream)
{
IStream removed = streams.remove(stream.getId());
if (removed != null)
{
assert removed == stream;
boolean local = stream.isLocal();
if (local)
localStreamCount.decrementAndGet();
else
remoteStreamCount.decrementAndGet();
flowControl.onStreamDestroyed(stream, local);
flowControl.onStreamDestroyed(stream);
if (LOG.isDebugEnabled())
LOG.debug("Removed {}", stream);
LOG.debug("Removed {} {}", local ? "local" : "remote", stream);
}
}
@ -1058,7 +1059,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
{
HeadersFrame headersFrame = (HeadersFrame)frame;
if (stream.updateClose(headersFrame.isEndStream(), true))
removeStream(stream, true);
removeStream(stream);
break;
}
case RST_STREAM:
@ -1066,7 +1067,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
if (stream != null)
{
stream.close();
removeStream(stream, true);
removeStream(stream);
}
break;
}
@ -1174,7 +1175,7 @@ public abstract class HTTP2Session implements ISession, Parser.Listener
// Only now we can update the close state
// and eventually remove the stream.
if (stream.updateClose(dataFrame.isEndStream(), true))
removeStream(stream, true);
removeStream(stream);
callback.succeeded();
}
}

View File

@ -51,15 +51,17 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
private final AtomicInteger recvWindow = new AtomicInteger();
private final ISession session;
private final int streamId;
private final boolean local;
private volatile Listener listener;
private volatile boolean localReset;
private volatile boolean remoteReset;
public HTTP2Stream(Scheduler scheduler, ISession session, int streamId)
public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, boolean local)
{
super(scheduler);
this.session = session;
this.streamId = streamId;
this.local = local;
}
@Override
@ -68,6 +70,12 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
return streamId;
}
@Override
public boolean isLocal()
{
return local;
}
@Override
public ISession getSession()
{
@ -242,7 +250,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
private void onHeaders(HeadersFrame frame, Callback callback)
{
if (updateClose(frame.isEndStream(), false))
session.removeStream(this, false);
session.removeStream(this);
callback.succeeded();
}
@ -273,7 +281,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
}
if (updateClose(frame.isEndStream(), false))
session.removeStream(this, false);
session.removeStream(this);
notifyData(this, frame, callback);
}
@ -281,7 +289,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
{
remoteReset = true;
close();
session.removeStream(this, false);
session.removeStream(this);
callback.succeeded();
notifyReset(this, frame);
}

View File

@ -41,9 +41,8 @@ public interface ISession extends Session
* <p>Removes the given {@code stream}.</p>
*
* @param stream the stream to remove
* @param local whether the stream is local or remote
*/
public void removeStream(IStream stream, boolean local);
public void removeStream(IStream stream);
/**
* <p>Enqueues the given frames to be written to the connection.</p>

View File

@ -39,6 +39,11 @@ public interface IStream extends Stream, Closeable
*/
public static final String CHANNEL_ATTRIBUTE = IStream.class.getName() + ".channel";
/**
* @return whether this stream is local or remote
*/
public boolean isLocal();
@Override
public ISession getSession();

View File

@ -134,11 +134,18 @@ public interface Session
public interface Listener
{
/**
* <p>Callback method invoked when the preface has been received.</p>
* <p>Callback method invoked:</p>
* <ul>
* <li>for clients, just before the preface is sent, to gather the
* SETTINGS configuration options the client wants to send to the server;</li>
* <li>for servers, just after having received the preface, to gather
* the SETTINGS configuration options the server wants to send to the
* client.</li>
* </ul>
*
* @param session the session
* @return a (possibly empty or null) map containing SETTINGS configuration
* options that are sent after the preface.
* options to send.
*/
public Map<Integer, Integer> onPreface(Session session);

View File

@ -635,9 +635,11 @@ public abstract class AbstractProxyServlet extends HttpServlet
else
{
proxyResponse.resetBuffer();
failure.printStackTrace();
int status = failure instanceof TimeoutException ?
HttpStatus.GATEWAY_TIMEOUT_504 :
HttpStatus.BAD_GATEWAY_502;
System.err.println("STATUS="+status);
sendProxyResponseError(clientRequest, proxyResponse, status);
}
}

View File

@ -305,7 +305,12 @@ public class ProxyServletFailureTest
.content(new BytesContentProvider(content))
.send();
Assert.assertEquals(500, response.getStatus());
// TODO which is correct?
if (proxyServlet instanceof AsyncProxyServlet)
Assert.assertEquals(502, response.getStatus());
else
Assert.assertEquals(500, response.getStatus());
}
@Test(expected = TimeoutException.class)

View File

@ -583,7 +583,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
if (handler!=null)
content=handler.badMessageError(status,reason,fields);
sendResponse(new MetaData.Response(HttpVersion.HTTP_1_1,status,reason,fields,0),content ,true);
sendResponse(new MetaData.Response(HttpVersion.HTTP_1_1,status,reason,fields,BufferUtil.length(content)),content ,true);
}
}
catch (IOException e)

View File

@ -689,24 +689,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{
case NEED_HEADER:
{
// Look for optimisation to avoid allocating a _header buffer
/*
Cannot use this optimisation unless we work out how not to overwrite data in user passed arrays.
if (_lastContent && _content!=null && !_content.isReadOnly() && _content.hasArray() && BufferUtil.space(_content)>_config.getResponseHeaderSize() )
{
// use spare space in content buffer for header buffer
int p=_content.position();
int l=_content.limit();
_content.position(l);
_content.limit(l+_config.getResponseHeaderSize());
_header=_content.slice();
_header.limit(0);
_content.position(p);
_content.limit(l);
}
else
*/
_header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
_header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
continue;
}