Made sure that when HttpServletRequest.isReady() returns false, it is possible to call it again without getting an exception. Fix ProxyServletTest.testExpect100ContinueRespond100ContinueSomeRequestContentThenFailure by flipping the input state to IDLE before unblocking a blocking read. Signed-off-by: Simone Bordet <simone.bordet@gmail.com> Signed-off-by: Ludovic Orban <lorban@bitronix.be> Co-authored-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
parent
c74437f9a8
commit
7de4b6a0e9
|
@ -219,6 +219,18 @@ class AsyncContentProducer implements ContentProducer
|
|||
public boolean isReady()
|
||||
{
|
||||
assertLocked();
|
||||
|
||||
ServletChannelState state = _servletChannel.getServletRequestState();
|
||||
|
||||
// If already unready, do not read via produceChunk();
|
||||
// rather, wait for the demand callback to be invoked.
|
||||
if (state.isInputUnready())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("isReady(), unready {}", this);
|
||||
return false;
|
||||
}
|
||||
|
||||
Content.Chunk chunk = produceChunk();
|
||||
if (chunk != null)
|
||||
{
|
||||
|
@ -227,9 +239,11 @@ class AsyncContentProducer implements ContentProducer
|
|||
return true;
|
||||
}
|
||||
|
||||
_servletChannel.getServletRequestState().onReadUnready();
|
||||
state.onReadUnready();
|
||||
_servletChannel.getRequest().demand(() ->
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("isReady() demand callback {}", this);
|
||||
if (_servletChannel.getHttpInput().onContentProducible())
|
||||
_servletChannel.handle();
|
||||
});
|
||||
|
|
|
@ -159,7 +159,11 @@ class BlockingContentProducer implements ContentProducer
|
|||
// Do not release the semaphore if we are not unready, as certain protocols may call this method
|
||||
// just after having received the request, not only when they have read all the available content.
|
||||
if (unready)
|
||||
{
|
||||
// Call nextChunk() to switch the input state back to IDLE, otherwise we would stay UNREADY.
|
||||
_asyncContentProducer.nextChunk();
|
||||
_semaphore.release();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.eclipse.jetty.client.AsyncRequestContent;
|
|||
import org.eclipse.jetty.client.BufferingResponseListener;
|
||||
import org.eclipse.jetty.client.ContentResponse;
|
||||
import org.eclipse.jetty.client.Destination;
|
||||
import org.eclipse.jetty.client.FutureResponseListener;
|
||||
import org.eclipse.jetty.client.InputStreamRequestContent;
|
||||
import org.eclipse.jetty.client.OutputStreamRequestContent;
|
||||
import org.eclipse.jetty.client.Response;
|
||||
|
@ -1460,6 +1461,85 @@ public class AsyncIOServletTest extends AbstractTest
|
|||
assertEquals(1, allDataReadCount.get());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("transportsNoFCGI")
|
||||
public void testIsReadyIdempotent(Transport transport) throws Exception
|
||||
{
|
||||
CountDownLatch bodyLatch = new CountDownLatch(1);
|
||||
CountDownLatch dataLatch = new CountDownLatch(1);
|
||||
start(transport, new HttpServlet()
|
||||
{
|
||||
@Override
|
||||
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
|
||||
{
|
||||
AsyncContext asyncContext = request.startAsync();
|
||||
asyncContext.setTimeout(0);
|
||||
|
||||
ServletInputStream input = request.getInputStream();
|
||||
input.setReadListener(new ReadListener()
|
||||
{
|
||||
@Override
|
||||
public void onDataAvailable() throws IOException
|
||||
{
|
||||
while (input.isReady())
|
||||
{
|
||||
int read = input.read();
|
||||
if (read < 0)
|
||||
break;
|
||||
}
|
||||
|
||||
// Call again isReady() to verify it is idempotent.
|
||||
if (input.isReady())
|
||||
throw new IOException();
|
||||
|
||||
dataLatch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAllDataRead()
|
||||
{
|
||||
response.setStatus(HttpStatus.NO_CONTENT_204);
|
||||
asyncContext.complete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable x)
|
||||
{
|
||||
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
|
||||
asyncContext.complete();
|
||||
}
|
||||
});
|
||||
|
||||
// The call to setReadListener() may call isReady(),
|
||||
// here call it again to verify that it is idempotent.
|
||||
assertFalse(input.isReady());
|
||||
|
||||
bodyLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
AsyncRequestContent body = new AsyncRequestContent();
|
||||
var request = client.newRequest(newURI(transport))
|
||||
.method(HttpMethod.POST)
|
||||
.body(body)
|
||||
.timeout(15, TimeUnit.SECONDS);
|
||||
FutureResponseListener listener = new FutureResponseListener(request);
|
||||
request.send(listener);
|
||||
|
||||
assertTrue(bodyLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
// Send a first chunk of body.
|
||||
body.write(ByteBuffer.allocate(512), Callback.NOOP);
|
||||
|
||||
assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
// Complete the body.
|
||||
body.close();
|
||||
|
||||
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
|
||||
assertEquals(HttpStatus.NO_CONTENT_204, response.getStatus());
|
||||
}
|
||||
|
||||
private static class Listener implements ReadListener, WriteListener
|
||||
{
|
||||
private final Executor executor = Executors.newFixedThreadPool(32);
|
||||
|
|
Loading…
Reference in New Issue