477890 - Overwhelmed HTTP/2 server discards data.
HttpInput was using a bounded ArrayQueue with max capacity 64. The queue was overflowing if there were more than 64 reads within the flow control window capacity. Fixed by replacing the ArrayQueue with ArrayDeque, which is unbounded.
This commit is contained in:
parent
f833f36c64
commit
460673f04b
|
@ -21,7 +21,9 @@ package org.eclipse.jetty.server;
|
|||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Objects;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import javax.servlet.ReadListener;
|
||||
|
@ -29,7 +31,6 @@ import javax.servlet.ServletInputStream;
|
|||
|
||||
import org.eclipse.jetty.io.EofException;
|
||||
import org.eclipse.jetty.io.RuntimeIOException;
|
||||
import org.eclipse.jetty.util.ArrayQueue;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
|
@ -48,9 +49,9 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
private final static Logger LOG = Log.getLogger(HttpInput.class);
|
||||
private final static Content EOF_CONTENT = new EofContent("EOF");
|
||||
private final static Content EARLY_EOF_CONTENT = new EofContent("EARLY_EOF");
|
||||
|
||||
|
||||
private final byte[] _oneByteBuffer = new byte[1];
|
||||
private final ArrayQueue<Content> _inputQ = new ArrayQueue<>();
|
||||
private final Queue<Content> _inputQ = new ArrayDeque<>();
|
||||
private final HttpChannelState _channelState;
|
||||
private ReadListener _listener;
|
||||
private State _state = STREAM;
|
||||
|
@ -63,21 +64,21 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
if (_channelState.getHttpChannel().getHttpConfiguration().getBlockingTimeout()>0)
|
||||
_blockingTimeoutAt=0;
|
||||
}
|
||||
|
||||
|
||||
protected HttpChannelState getHttpChannelState()
|
||||
{
|
||||
return _channelState;
|
||||
}
|
||||
|
||||
|
||||
public void recycle()
|
||||
{
|
||||
synchronized (_inputQ)
|
||||
{
|
||||
Content item = _inputQ.pollUnsafe();
|
||||
Content item = _inputQ.poll();
|
||||
while (item != null)
|
||||
{
|
||||
item.failed(null);
|
||||
item = _inputQ.pollUnsafe();
|
||||
item = _inputQ.poll();
|
||||
}
|
||||
_listener = null;
|
||||
_state = STREAM;
|
||||
|
@ -92,7 +93,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
boolean woken=false;
|
||||
synchronized (_inputQ)
|
||||
{
|
||||
Content content = _inputQ.peekUnsafe();
|
||||
Content content = _inputQ.peek();
|
||||
if (content==null)
|
||||
{
|
||||
try
|
||||
|
@ -103,13 +104,13 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
{
|
||||
woken=failed(e);
|
||||
}
|
||||
content = _inputQ.peekUnsafe();
|
||||
content = _inputQ.peek();
|
||||
}
|
||||
|
||||
|
||||
if (content!=null)
|
||||
available= remaining(content);
|
||||
}
|
||||
|
||||
|
||||
if (woken)
|
||||
wake();
|
||||
return available;
|
||||
|
@ -117,10 +118,10 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
|
||||
private void wake()
|
||||
{
|
||||
_channelState.getHttpChannel().getConnector().getExecutor().execute(_channelState.getHttpChannel());
|
||||
_channelState.getHttpChannel().getConnector().getExecutor().execute(_channelState.getHttpChannel());
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public int read() throws IOException
|
||||
{
|
||||
|
@ -137,7 +138,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
{
|
||||
if (_blockingTimeoutAt>=0 && !isAsync())
|
||||
_blockingTimeoutAt=System.currentTimeMillis()+getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout();
|
||||
|
||||
|
||||
while(true)
|
||||
{
|
||||
Content item = nextContent();
|
||||
|
@ -146,12 +147,12 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} read {} from {}",this,len,item);
|
||||
int l = get(item, b, off, len);
|
||||
|
||||
|
||||
consumeNonContent();
|
||||
|
||||
|
||||
return l;
|
||||
}
|
||||
|
||||
|
||||
if (!_state.blockForContent(this))
|
||||
return _state.noContent();
|
||||
}
|
||||
|
@ -159,7 +160,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
}
|
||||
|
||||
/**
|
||||
* Called when derived implementations should attempt to
|
||||
* Called when derived implementations should attempt to
|
||||
* produce more Content and add it via {@link #addContent(Content)}.
|
||||
* For protocols that are constantly producing (eg HTTP2) this can
|
||||
* be left as a noop;
|
||||
|
@ -168,11 +169,11 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
protected void produceContent() throws IOException
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the next content from the inputQ, calling {@link #produceContent()}
|
||||
* if need be. EOF is processed and state changed.
|
||||
*
|
||||
*
|
||||
* @return the content or null if none available.
|
||||
* @throws IOException if retrieving the content fails
|
||||
*/
|
||||
|
@ -186,7 +187,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
}
|
||||
return content;
|
||||
}
|
||||
|
||||
|
||||
/** Poll the inputQ for Content.
|
||||
* Consumed buffers and {@link PoisonPillContent}s are removed and
|
||||
* EOF state updated if need be.
|
||||
|
@ -195,11 +196,11 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
protected Content pollContent()
|
||||
{
|
||||
// Items are removed only when they are fully consumed.
|
||||
Content content = _inputQ.peekUnsafe();
|
||||
Content content = _inputQ.peek();
|
||||
// Skip consumed items at the head of the queue.
|
||||
while (content != null && remaining(content) == 0)
|
||||
{
|
||||
_inputQ.pollUnsafe();
|
||||
_inputQ.poll();
|
||||
content.succeeded();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} consumed {}", this, content);
|
||||
|
@ -212,45 +213,45 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
{
|
||||
_state=AEOF;
|
||||
boolean woken = _channelState.onReadReady(); // force callback?
|
||||
if (woken)
|
||||
if (woken)
|
||||
wake();
|
||||
}
|
||||
}
|
||||
else if (content==EARLY_EOF_CONTENT)
|
||||
_state=EARLY_EOF;
|
||||
|
||||
content = _inputQ.peekUnsafe();
|
||||
content = _inputQ.peek();
|
||||
}
|
||||
|
||||
|
||||
return content;
|
||||
}
|
||||
|
||||
/**
|
||||
/**
|
||||
*/
|
||||
protected void consumeNonContent()
|
||||
{
|
||||
// Items are removed only when they are fully consumed.
|
||||
Content content = _inputQ.peekUnsafe();
|
||||
Content content = _inputQ.peek();
|
||||
// Skip consumed items at the head of the queue.
|
||||
while (content != null && remaining(content) == 0)
|
||||
{
|
||||
// Defer EOF until read
|
||||
if (content instanceof EofContent)
|
||||
break;
|
||||
|
||||
|
||||
// Consume all other empty content
|
||||
_inputQ.pollUnsafe();
|
||||
_inputQ.poll();
|
||||
content.succeeded();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} consumed {}", this, content);
|
||||
content = _inputQ.peekUnsafe();
|
||||
}
|
||||
content = _inputQ.peek();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next readable from the inputQ, calling {@link #produceContent()}
|
||||
* if need be. EOF is NOT processed and state is not changed.
|
||||
*
|
||||
*
|
||||
* @return the content or EOF or null if none available.
|
||||
* @throws IOException if retrieving the content fails
|
||||
*/
|
||||
|
@ -273,22 +274,22 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
protected Content pollReadable()
|
||||
{
|
||||
// Items are removed only when they are fully consumed.
|
||||
Content content = _inputQ.peekUnsafe();
|
||||
|
||||
Content content = _inputQ.peek();
|
||||
|
||||
// Skip consumed items at the head of the queue except EOF
|
||||
while (content != null)
|
||||
{
|
||||
if (content==EOF_CONTENT || content==EARLY_EOF_CONTENT || remaining(content)>0)
|
||||
return content;
|
||||
|
||||
_inputQ.pollUnsafe();
|
||||
|
||||
_inputQ.poll();
|
||||
content.succeeded();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} consumed {}", this, content);
|
||||
content = _inputQ.peekUnsafe();
|
||||
content = _inputQ.peek();
|
||||
}
|
||||
|
||||
return content;
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -334,7 +335,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
pollContent(); // hungry succeed
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Blocks until some content or some end-of-file event arrives.
|
||||
*
|
||||
|
@ -358,7 +359,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
_inputQ.wait(timeout);
|
||||
else
|
||||
_inputQ.wait();
|
||||
|
||||
|
||||
if (_blockingTimeoutAt>0 && System.currentTimeMillis()>=_blockingTimeoutAt)
|
||||
throw new TimeoutException();
|
||||
}
|
||||
|
@ -367,7 +368,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
throw (IOException)new InterruptedIOException().initCause(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Adds some content to this input stream.
|
||||
*
|
||||
|
@ -379,16 +380,16 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
boolean woken=false;
|
||||
synchronized (_inputQ)
|
||||
{
|
||||
_inputQ.addUnsafe(item);
|
||||
_inputQ.offer(item);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} addContent {}", this, item);
|
||||
|
||||
|
||||
if (_listener==null)
|
||||
_inputQ.notify();
|
||||
else
|
||||
woken=_channelState.onReadPossible();
|
||||
woken=_channelState.onReadPossible();
|
||||
}
|
||||
|
||||
|
||||
return woken;
|
||||
}
|
||||
|
||||
|
@ -396,10 +397,10 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
{
|
||||
synchronized (_inputQ)
|
||||
{
|
||||
return _inputQ.sizeUnsafe()>0;
|
||||
return _inputQ.size()>0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void unblock()
|
||||
{
|
||||
synchronized (_inputQ)
|
||||
|
@ -407,7 +408,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
_inputQ.notify();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public long getContentConsumed()
|
||||
{
|
||||
synchronized (_inputQ)
|
||||
|
@ -450,7 +451,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
Content item = nextContent();
|
||||
if (item == null)
|
||||
break; // Let's not bother blocking
|
||||
|
||||
|
||||
skip(item, remaining(item));
|
||||
}
|
||||
return isFinished() && !isError();
|
||||
|
@ -470,7 +471,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
return _state instanceof ErrorState;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public boolean isAsync()
|
||||
{
|
||||
synchronized (_inputQ)
|
||||
|
@ -487,7 +488,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
return _state instanceof EOFState;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public boolean isReady()
|
||||
|
@ -531,7 +532,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
_state = ASYNC;
|
||||
_listener = readListener;
|
||||
boolean content=nextContent()!=null;
|
||||
|
||||
|
||||
if (content)
|
||||
woken = _channelState.onReadReady();
|
||||
else
|
||||
|
@ -556,8 +557,8 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
LOG.warn(x);
|
||||
else
|
||||
_state = new ErrorState(x);
|
||||
|
||||
if (_listener==null)
|
||||
|
||||
if (_listener==null)
|
||||
_inputQ.notify();
|
||||
else
|
||||
woken=_channelState.onReadPossible();
|
||||
|
@ -569,7 +570,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
/* ------------------------------------------------------------ */
|
||||
/*
|
||||
* <p>
|
||||
* While this class is-a Runnable, it should never be dispatched in it's own thread. It is a
|
||||
* While this class is-a Runnable, it should never be dispatched in it's own thread. It is a
|
||||
* runnable only so that the calling thread can use {@link ContextHandler#handle(Runnable)}
|
||||
* to setup classloaders etc.
|
||||
* </p>
|
||||
|
@ -585,7 +586,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
{
|
||||
if (_state==EOF)
|
||||
return;
|
||||
|
||||
|
||||
if (_state==AEOF)
|
||||
{
|
||||
_state=EOF;
|
||||
|
@ -593,7 +594,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
}
|
||||
|
||||
listener = _listener;
|
||||
error = _state instanceof ErrorState?((ErrorState)_state).getError():null;
|
||||
error = _state instanceof ErrorState?((ErrorState)_state).getError():null;
|
||||
}
|
||||
|
||||
try
|
||||
|
@ -604,9 +605,13 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
listener.onError(error);
|
||||
}
|
||||
else if (aeof)
|
||||
listener.onAllDataRead();
|
||||
else if (error == null)
|
||||
{
|
||||
listener.onAllDataRead();
|
||||
}
|
||||
else
|
||||
{
|
||||
listener.onDataAvailable();
|
||||
}
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
|
@ -637,14 +642,14 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
super(BufferUtil.EMPTY_BUFFER);
|
||||
_name=name;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return _name;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static class EofContent extends PoisonPillContent
|
||||
{
|
||||
EofContent(String name)
|
||||
|
@ -652,46 +657,46 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
super(name);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static class Content implements Callback
|
||||
{
|
||||
private final ByteBuffer _content;
|
||||
|
||||
|
||||
public Content(ByteBuffer content)
|
||||
{
|
||||
_content=content;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean isNonBlocking()
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
public ByteBuffer getContent()
|
||||
{
|
||||
return _content;
|
||||
}
|
||||
|
||||
|
||||
public boolean hasContent()
|
||||
{
|
||||
return _content.hasRemaining();
|
||||
}
|
||||
|
||||
|
||||
public int remaining()
|
||||
{
|
||||
return _content.remaining();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("Content@%x{%s}",hashCode(),BufferUtil.toDetailString(_content));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
protected static abstract class State
|
||||
{
|
||||
public boolean blockForContent(HttpInput in) throws IOException
|
||||
|
@ -708,7 +713,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
protected static class EOFState extends State
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
protected class ErrorState extends EOFState
|
||||
{
|
||||
final Throwable _error;
|
||||
|
@ -716,7 +721,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
{
|
||||
_error=error;
|
||||
}
|
||||
|
||||
|
||||
public Throwable getError()
|
||||
{
|
||||
return _error;
|
||||
|
@ -767,7 +772,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
return "ASYNC";
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
protected static final State EARLY_EOF = new EOFState()
|
||||
{
|
||||
@Override
|
||||
|
@ -791,7 +796,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
return "EOF";
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
protected static final State AEOF = new EOFState()
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -19,8 +19,10 @@
|
|||
package org.eclipse.jetty.http.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletInputStream;
|
||||
|
@ -190,4 +192,61 @@ public class HttpClientTest extends AbstractTest
|
|||
Assert.assertEquals(200, response.getStatus());
|
||||
Assert.assertEquals(0, response.getContent().length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientManyWritesSlowServer() throws Exception
|
||||
{
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
|
||||
long sleep = 1024;
|
||||
long total = 0;
|
||||
ServletInputStream input = request.getInputStream();
|
||||
byte[] buffer = new byte[1024];
|
||||
while (true)
|
||||
{
|
||||
int read = input.read(buffer);
|
||||
if (read < 0)
|
||||
break;
|
||||
total += read;
|
||||
if (total >= sleep)
|
||||
{
|
||||
sleep(250);
|
||||
sleep += 256;
|
||||
}
|
||||
}
|
||||
|
||||
response.getOutputStream().print(total);
|
||||
}
|
||||
});
|
||||
|
||||
int chunks = 256;
|
||||
int chunkSize = 16;
|
||||
byte[][] bytes = IntStream.range(0, chunks).mapToObj(x -> new byte[chunkSize]).toArray(byte[][]::new);
|
||||
BytesContentProvider contentProvider = new BytesContentProvider("application/octet-stream", bytes);
|
||||
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
|
||||
.method(HttpMethod.POST)
|
||||
.content(contentProvider)
|
||||
.timeout(15, TimeUnit.SECONDS)
|
||||
.send();
|
||||
|
||||
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
Assert.assertEquals(chunks * chunkSize, Integer.parseInt(response.getContentAsString()));
|
||||
}
|
||||
|
||||
private void sleep(long time) throws IOException
|
||||
{
|
||||
try
|
||||
{
|
||||
Thread.sleep(time);
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue