Now also the HttpReceiver.responseContentAvailable() is serialized, so that the access to `this.contentSource` is serialized with failure, and protected by a call to `exchange.isResponseCompleteOrTerminated()`. Before, it was possible that a thread failed the response, nulling out `this.contentSource`, while another thread was just about to call `responseContentAvailable()` -- this was the case for HTTP/2 in particular, where content is notified asynchronously, rather than being created by a call to `ContentSource.read()`. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
661546ecff
commit
d0ea445d2d
|
@ -315,13 +315,35 @@ public abstract class HttpReceiver
|
|||
* Method to be invoked when response content is available to be read.
|
||||
* <p>
|
||||
* This method takes care of ensuring the {@link Content.Source} passed to
|
||||
* {@link Response.ContentSourceListener#onContentSource(Response, Content.Source)} calls the
|
||||
* demand callback.
|
||||
* {@link Response.ContentSourceListener#onContentSource(Response, Content.Source)}
|
||||
* calls the demand callback.
|
||||
* The call to the demand callback is serialized with other events.
|
||||
*/
|
||||
protected void responseContentAvailable(HttpExchange exchange)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Invoking responseContentAvailable on {}", this);
|
||||
|
||||
invoker.run(() ->
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Executing responseContentAvailable on {}", this);
|
||||
|
||||
if (exchange.isResponseCompleteOrTerminated())
|
||||
return;
|
||||
|
||||
responseContentAvailable();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to be invoked when response content is available to be read.
|
||||
* <p>
|
||||
* This method directly invokes the demand callback, assuming the caller
|
||||
* is already serialized with other events.
|
||||
*/
|
||||
protected void responseContentAvailable()
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Response content available on {}", this);
|
||||
contentSource.onDataAvailable();
|
||||
}
|
||||
|
||||
|
@ -344,7 +366,7 @@ public abstract class HttpReceiver
|
|||
if (!exchange.responseComplete(null))
|
||||
return;
|
||||
|
||||
invoker.run(() ->
|
||||
Runnable successTask = () ->
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Executing responseSuccess on {}", this);
|
||||
|
@ -365,7 +387,12 @@ public abstract class HttpReceiver
|
|||
// Mark atomically the response as terminated, with
|
||||
// respect to concurrency between request and response.
|
||||
terminateResponse(exchange);
|
||||
}, afterSuccessTask);
|
||||
};
|
||||
|
||||
if (afterSuccessTask == null)
|
||||
invoker.run(successTask);
|
||||
else
|
||||
invoker.run(successTask, afterSuccessTask);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -712,9 +739,9 @@ public abstract class HttpReceiver
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("onDataAvailable on {}", this);
|
||||
// The demandCallback will call read() that will itself call
|
||||
// HttpReceiver.read(boolean) so it must be called by the invoker.
|
||||
invokeDemandCallback(true);
|
||||
// The onDataAvailable() method is only ever called
|
||||
// by the invoker so avoid using the invoker again.
|
||||
invokeDemandCallback(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -760,8 +787,8 @@ public abstract class HttpReceiver
|
|||
}
|
||||
}
|
||||
|
||||
// The processDemand method is only ever called by the
|
||||
// invoker so there is no need to use the latter here.
|
||||
// The processDemand() method is only ever called
|
||||
// by the invoker so avoid using the invoker again.
|
||||
invokeDemandCallback(false);
|
||||
}
|
||||
|
||||
|
@ -769,20 +796,19 @@ public abstract class HttpReceiver
|
|||
{
|
||||
Runnable demandCallback = demandCallbackRef.getAndSet(null);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Invoking demand callback on {}", this);
|
||||
if (demandCallback != null)
|
||||
LOG.debug("Invoking demand callback {} on {}", demandCallback, this);
|
||||
if (demandCallback == null)
|
||||
return;
|
||||
try
|
||||
{
|
||||
try
|
||||
{
|
||||
if (invoke)
|
||||
invoker.run(demandCallback);
|
||||
else
|
||||
demandCallback.run();
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
fail(x);
|
||||
}
|
||||
if (invoke)
|
||||
invoker.run(demandCallback);
|
||||
else
|
||||
demandCallback.run();
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
fail(x);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -79,7 +79,9 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
|
|||
}
|
||||
else
|
||||
{
|
||||
responseContentAvailable();
|
||||
HttpExchange exchange = getHttpExchange();
|
||||
if (exchange != null)
|
||||
responseContentAvailable(exchange);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -40,7 +40,9 @@ public class HttpReceiverOverFCGI extends HttpReceiver
|
|||
}
|
||||
else
|
||||
{
|
||||
responseContentAvailable();
|
||||
HttpExchange exchange = getHttpExchange();
|
||||
if (exchange != null)
|
||||
responseContentAvailable(exchange);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -107,6 +109,9 @@ public class HttpReceiverOverFCGI extends HttpReceiver
|
|||
|
||||
void content(Content.Chunk chunk)
|
||||
{
|
||||
HttpExchange exchange = getHttpExchange();
|
||||
if (exchange == null)
|
||||
return;
|
||||
if (this.chunk != null)
|
||||
throw new IllegalStateException();
|
||||
// Retain the chunk because it is stored for later reads.
|
||||
|
|
|
@ -50,8 +50,6 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
|
|||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP2.class);
|
||||
|
||||
private final Runnable onDataAvailableTask = new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, this::responseContentAvailable);
|
||||
|
||||
public HttpReceiverOverHTTP2(HttpChannel channel)
|
||||
{
|
||||
super(channel);
|
||||
|
@ -213,7 +211,7 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel.
|
|||
HttpExchange exchange = getHttpExchange();
|
||||
if (exchange == null)
|
||||
return null;
|
||||
return onDataAvailableTask;
|
||||
return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> responseContentAvailable(exchange));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.ArrayList;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
|
@ -33,6 +34,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import org.eclipse.jetty.client.AsyncRequestContent;
|
||||
import org.eclipse.jetty.client.CompletableResponseListener;
|
||||
import org.eclipse.jetty.client.Connection;
|
||||
import org.eclipse.jetty.client.ContentResponse;
|
||||
import org.eclipse.jetty.client.Destination;
|
||||
|
@ -793,6 +796,37 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest
|
|||
assertThat(onContentSourceErrorRef.get(), is(nullValue()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRequestContentResponseContent() throws Exception
|
||||
{
|
||||
start(new Handler.Abstract()
|
||||
{
|
||||
@Override
|
||||
public boolean handle(Request request, org.eclipse.jetty.server.Response response, Callback callback)
|
||||
{
|
||||
Content.copy(request, response, callback);
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
AsyncRequestContent content = new AsyncRequestContent();
|
||||
var request = httpClient.newRequest("localhost", connector.getLocalPort())
|
||||
.method(HttpMethod.POST)
|
||||
.body(content);
|
||||
CompletableFuture<ContentResponse> completable = new CompletableResponseListener(request).send();
|
||||
|
||||
for (int i = 0; i < 16; ++i)
|
||||
{
|
||||
content.write(false, ByteBuffer.allocate(512), Callback.NOOP);
|
||||
Thread.sleep(10);
|
||||
}
|
||||
content.close();
|
||||
|
||||
ContentResponse response = completable.get(15, TimeUnit.SECONDS);
|
||||
|
||||
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Tag("external")
|
||||
public void testExternalServer() throws Exception
|
||||
|
|
|
@ -127,7 +127,7 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client
|
|||
if (exchange == null)
|
||||
return;
|
||||
|
||||
responseContentAvailable();
|
||||
responseContentAvailable(exchange);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue