Fixes #4301 - Demand beforeContent is not forwarded.

Now correctly handling no demand before the content
in FCGI and HTTP2 transports.

Fixed HttpRequest to correctly forward onBeforeContent()
to wrapped listeners.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2019-11-12 23:33:21 +01:00
parent 4825832a53
commit aa57463175
11 changed files with 211 additions and 23 deletions

View File

@ -119,7 +119,7 @@ public abstract class HttpReceiver
}
}
private long demand()
protected long demand()
{
return demand(LongUnaryOperator.identity());
}

View File

@ -545,6 +545,12 @@ public class HttpRequest implements Request
{
this.responseListeners.add(new Response.DemandedContentListener()
{
@Override
public void onBeforeContent(Response response, LongConsumer demand)
{
listener.onBeforeContent(response, demand);
}
@Override
public void onContent(Response response, LongConsumer demand, ByteBuffer content, Callback callback)
{

View File

@ -420,13 +420,13 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
}
@Override
public void onHeaders(int request)
public boolean onHeaders(int request)
{
HttpChannelOverFCGI channel = activeChannels.get(request);
if (channel != null)
channel.responseHeaders();
else
noChannel(request);
return !channel.responseHeaders();
noChannel(request);
return false;
}
@Override

View File

@ -80,9 +80,9 @@ public class ClientParser extends Parser
}
@Override
public void onHeaders(int request)
public boolean onHeaders(int request)
{
listener.onHeaders(request);
return listener.onHeaders(request);
}
@Override

View File

@ -135,7 +135,11 @@ public abstract class Parser
{
void onHeader(int request, HttpField field);
void onHeaders(int request);
/**
* @param request the request id
* @return true to signal to the parser to stop parsing, false to continue parsing
*/
boolean onHeaders(int request);
/**
* @param request the request id
@ -158,8 +162,9 @@ public abstract class Parser
}
@Override
public void onHeaders(int request)
public boolean onHeaders(int request)
{
return false;
}
@Override

View File

@ -81,7 +81,7 @@ public class ResponseContentParser extends StreamContentParser
parsers.remove(request);
}
private class ResponseParser implements HttpParser.ResponseHandler
private static class ResponseParser implements HttpParser.ResponseHandler
{
private final HttpFields fields = new HttpFields();
private ClientParser.Listener listener;
@ -89,6 +89,7 @@ public class ResponseContentParser extends StreamContentParser
private final FCGIHttpParser httpParser;
private State state = State.HEADERS;
private boolean seenResponseCode;
private boolean stalled;
private ResponseParser(ClientParser.Listener listener, int request)
{
@ -110,7 +111,11 @@ public class ResponseContentParser extends StreamContentParser
case HEADERS:
{
if (httpParser.parseNext(buffer))
{
state = State.CONTENT_MODE;
if (stalled)
return true;
}
remaining = buffer.remaining();
break;
}
@ -239,16 +244,17 @@ public class ResponseContentParser extends StreamContentParser
}
}
private void notifyHeaders()
private boolean notifyHeaders()
{
try
{
listener.onHeaders(request);
return listener.onHeaders(request);
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Exception while invoking listener " + listener, x);
return false;
}
}
@ -261,8 +267,10 @@ public class ResponseContentParser extends StreamContentParser
notifyBegin(200, "OK");
notifyHeaders(fields);
}
notifyHeaders();
// Return from HTTP parsing so that we can parse the content.
// Remember whether we have demand.
stalled = notifyHeaders();
// Always return from HTTP parsing so that we
// can parse the content with the FCGI parser.
return true;
}

View File

@ -109,10 +109,11 @@ public class ClientGeneratorTest
}
@Override
public void onHeaders(int request)
public boolean onHeaders(int request)
{
assertEquals(id, request);
params.set(params.get() * primes[4]);
return false;
}
});

View File

@ -90,10 +90,11 @@ public class ClientParserTest
}
@Override
public void onHeaders(int request)
public boolean onHeaders(int request)
{
assertEquals(id, request);
params.set(params.get() * primes[2]);
return false;
}
});

View File

@ -144,7 +144,7 @@ public class ServerFCGIConnection extends AbstractConnection
}
@Override
public void onHeaders(int request)
public boolean onHeaders(int request)
{
HttpChannelOverFCGI channel = channels.get(request);
if (LOG.isDebugEnabled())
@ -154,6 +154,7 @@ public class ServerFCGIConnection extends AbstractConnection
channel.onRequest();
channel.dispatch();
}
return false;
}
@Override

View File

@ -105,12 +105,24 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
if (frame.isEndStream() || informational)
responseSuccess(exchange);
}
else
{
if (frame.isEndStream())
{
// There is no demand to trigger response success, so add
// a poison pill to trigger it when there will be demand.
notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
}
}
}
}
else // Response trailers.
{
HttpFields trailers = metaData.getFields();
trailers.forEach(httpResponse::trailer);
// Previous DataFrames had endStream=false, so
// add a poison pill to trigger response success
// after all normal DataFrames have been consumed.
notifyContent(exchange, new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP);
}
}
@ -200,7 +212,7 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
contentNotifier.offer(exchange, frame, callback);
}
private static class ContentNotifier
private class ContentNotifier
{
private final Queue<DataInfo> queue = new ArrayDeque<>();
private final HttpReceiverOverHTTP2 receiver;
@ -234,9 +246,25 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
private void process(boolean resume)
{
// Allow only one thread at a time.
if (active(resume))
boolean busy = active(resume);
if (LOG.isDebugEnabled())
LOG.debug("Resuming({}) processing({}) of content", resume, !busy);
if (busy)
return;
// Process only if there is demand.
synchronized (this)
{
if (!resume && demand() <= 0)
{
if (LOG.isDebugEnabled())
LOG.debug("Stalling processing, content available but no demand");
active = false;
stalled = true;
return;
}
}
while (true)
{
if (dataInfo != null)
@ -253,7 +281,7 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
{
dataInfo = queue.poll();
if (LOG.isDebugEnabled())
LOG.debug("Dequeued content {}", dataInfo);
LOG.debug("Processing content {}", dataInfo);
if (dataInfo == null)
{
active = false;
@ -269,8 +297,12 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
boolean proceed = receiver.responseContent(dataInfo.exchange, buffer, Callback.from(callback::succeeded, x -> fail(callback, x)));
if (!proceed)
{
// Should stall, unless just resumed.
if (stall())
// The call to responseContent() said we should
// stall, but another thread may have just resumed.
boolean stall = stall();
if (LOG.isDebugEnabled())
LOG.debug("Stalling({}) processing", stall);
if (stall)
return;
}
}
@ -287,27 +319,46 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
{
if (active)
{
// There is a thread in process(),
// but it may be about to exit, so
// remember "resume" to signal the
// processing thread to continue.
if (resume)
this.resume = true;
return true;
}
// If there is no demand (i.e. stalled
// and not resuming) then don't process.
if (stalled && !resume)
return true;
// Start processing.
active = true;
stalled = false;
return false;
}
}
/**
* Called when there is no demand, this method checks whether
* the processing should really stop or it should continue.
*
* @return true to stop processing, false to continue processing
*/
private boolean stall()
{
synchronized (this)
{
if (resume)
{
// There was no demand, but another thread
// just demanded, continue processing.
resume = false;
return false;
}
// There is no demand, stop processing.
active = false;
stalled = true;
return true;
@ -332,7 +383,7 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
receiver.responseFailure(failure);
}
private static class DataInfo
private class DataInfo
{
private final HttpExchange exchange;
private final DataFrame frame;

View File

@ -414,4 +414,119 @@ public class HttpClientDemandTest extends AbstractTest<TransportScenario>
assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
assertArrayEquals(content, bytes);
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testDelayedBeforeContentDemand(Transport transport) throws Exception
{
init(transport);
byte[] content = new byte[1024];
new Random().nextBytes(content);
scenario.start(new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
response.setContentLength(content.length);
response.getOutputStream().write(content);
}
});
byte[] bytes = new byte[content.length];
ByteBuffer received = ByteBuffer.wrap(bytes);
AtomicReference<LongConsumer> beforeContentDemandRef = new AtomicReference<>();
CountDownLatch beforeContentLatch = new CountDownLatch(1);
CountDownLatch contentLatch = new CountDownLatch(1);
CountDownLatch resultLatch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
.onResponseContentDemanded(new Response.DemandedContentListener()
{
@Override
public void onBeforeContent(Response response, LongConsumer demand)
{
// Do not demand now.
beforeContentDemandRef.set(demand);
beforeContentLatch.countDown();
}
@Override
public void onContent(Response response, LongConsumer demand, ByteBuffer buffer, Callback callback)
{
contentLatch.countDown();
received.put(buffer);
callback.succeeded();
demand.accept(1);
}
})
.send(result ->
{
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
resultLatch.countDown();
});
assertTrue(beforeContentLatch.await(5, TimeUnit.SECONDS));
LongConsumer demand = beforeContentDemandRef.get();
// Content must not be notified until we demand.
assertFalse(contentLatch.await(1, TimeUnit.SECONDS));
demand.accept(1);
assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
assertArrayEquals(content, bytes);
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testDelayedBeforeContentDemandWithNoResponseContent(Transport transport) throws Exception
{
init(transport);
scenario.start(new EmptyServerHandler());
AtomicReference<LongConsumer> beforeContentDemandRef = new AtomicReference<>();
CountDownLatch beforeContentLatch = new CountDownLatch(1);
CountDownLatch contentLatch = new CountDownLatch(1);
CountDownLatch resultLatch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
.onResponseContentDemanded(new Response.DemandedContentListener()
{
@Override
public void onBeforeContent(Response response, LongConsumer demand)
{
// Do not demand now.
beforeContentDemandRef.set(demand);
beforeContentLatch.countDown();
}
@Override
public void onContent(Response response, LongConsumer demand, ByteBuffer buffer, Callback callback)
{
contentLatch.countDown();
callback.succeeded();
demand.accept(1);
}
})
.send(result ->
{
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
resultLatch.countDown();
});
assertTrue(beforeContentLatch.await(5, TimeUnit.SECONDS));
LongConsumer demand = beforeContentDemandRef.get();
// Content must not be notified until we demand.
assertFalse(contentLatch.await(1, TimeUnit.SECONDS));
demand.accept(1);
// Content must not be notified as there is no content.
assertFalse(contentLatch.await(1, TimeUnit.SECONDS));
assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
}
}