diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java index 8598616340c..6af4e71d07b 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java @@ -212,10 +212,6 @@ public interface Request extends Attributes, Content.Source /** * {@inheritDoc} * @param demandCallback the demand callback to invoke when there is a content chunk available. - * In addition to the invocation guarantees of {@link Content.Source#demand(Runnable)}, - * this implementation serializes the invocation of the {@code Runnable} with - * invocations of any {@link Response#write(boolean, ByteBuffer, Callback)} - * {@code Callback} invocations. * @see Content.Source#demand(Runnable) */ @Override diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java index 7e1bbda226c..cdca4131fc1 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java @@ -152,9 +152,6 @@ public interface Response extends Content.Sink * has returned.

*

Thus a {@code Callback} should not block waiting for a callback * of a future call to this method.

- *

Furthermore, the invocation of the passed callback is serialized - * with invocations of the {@link Runnable} demand callback passed to - * {@link Request#demand(Runnable)}.

* * @param last whether the ByteBuffer is the last to write * @param byteBuffer the ByteBuffer to write diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java index 3bf2343a47a..c1e2031e77c 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java @@ -99,7 +99,8 @@ public class HttpChannelState implements HttpChannel, Components private final AutoLock _lock = new AutoLock(); private final HandlerInvoker _handlerInvoker = new HandlerInvoker(); private final ConnectionMetaData _connectionMetaData; - private final SerializedInvoker _serializedInvoker; + private final SerializedInvoker _readInvoker; + private final SerializedInvoker _writeInvoker; private final ResponseHttpFields _responseHeaders = new ResponseHttpFields(); private Thread _handling; private boolean _handled; @@ -122,7 +123,8 @@ public class HttpChannelState implements HttpChannel, Components { _connectionMetaData = connectionMetaData; // The SerializedInvoker is used to prevent infinite recursion of callbacks calling methods calling callbacks etc. - _serializedInvoker = new HttpChannelSerializedInvoker(); + _readInvoker = new HttpChannelSerializedInvoker(); + _writeInvoker = new HttpChannelSerializedInvoker(); } @Override @@ -298,7 +300,7 @@ public class HttpChannelState implements HttpChannel, Components onContent = _onContentAvailable; _onContentAvailable = null; } - return _serializedInvoker.offer(onContent); + return _readInvoker.offer(onContent); } @Override @@ -341,13 +343,13 @@ public class HttpChannelState implements HttpChannel, Components // If there was a pending IO operation, deliver the idle timeout via them. if (invokeOnContentAvailable != null || invokeWriteFailure != null) - return _serializedInvoker.offer(invokeOnContentAvailable, invokeWriteFailure); + return Invocable.combine(_readInvoker.offer(invokeOnContentAvailable), _writeInvoker.offer(invokeWriteFailure)); // Otherwise, if there are idle timeout listeners, ask them whether we should call onFailure. Predicate onIdleTimeout = _onIdleTimeout; if (onIdleTimeout != null) { - return _serializedInvoker.offer(() -> + return () -> { if (onIdleTimeout.test(t)) { @@ -356,7 +358,7 @@ public class HttpChannelState implements HttpChannel, Components if (task != null) task.run(); } - }); + }; } } @@ -426,7 +428,7 @@ public class HttpChannelState implements HttpChannel, Components }; // Serialize all the error actions. - task = _serializedInvoker.offer(invokeOnContentAvailable, invokeWriteFailure, invokeOnFailureListeners); + task = Invocable.combine(_readInvoker.offer(invokeOnContentAvailable), _writeInvoker.offer(invokeWriteFailure), invokeOnFailureListeners); } } @@ -912,7 +914,7 @@ public class HttpChannelState implements HttpChannel, Components if (error) { - httpChannelState._serializedInvoker.run(demandCallback); + httpChannelState._readInvoker.run(demandCallback); } else if (interimCallback == null) { @@ -1189,14 +1191,14 @@ public class HttpChannelState implements HttpChannel, Components if (writeFailure == NOTHING_TO_SEND) { - httpChannelState._serializedInvoker.run(callback::succeeded); + httpChannelState._writeInvoker.run(callback::succeeded); return; } // Have we failed in some way? if (writeFailure != null) { Throwable failure = writeFailure; - httpChannelState._serializedInvoker.run(() -> callback.failed(failure)); + httpChannelState._writeInvoker.run(() -> callback.failed(failure)); return; } @@ -1235,7 +1237,7 @@ public class HttpChannelState implements HttpChannel, Components httpChannel.lockedStreamSendCompleted(true); } if (callback != null) - httpChannel._serializedInvoker.run(callback::succeeded); + httpChannel._writeInvoker.run(callback::succeeded); } /** @@ -1263,7 +1265,7 @@ public class HttpChannelState implements HttpChannel, Components httpChannel.lockedStreamSendCompleted(false); } if (callback != null) - httpChannel._serializedInvoker.run(() -> callback.failed(x)); + httpChannel._writeInvoker.run(() -> callback.failed(x)); } @Override diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpChannelTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpChannelTest.java index 6f64b011186..726f7ee8d22 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpChannelTest.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpChannelTest.java @@ -1220,15 +1220,15 @@ public class HttpChannelTest assertThat(chunk.getFailure(), sameInstance(failure)); CountDownLatch demand = new CountDownLatch(1); - // Demand callback serialized until after onFailure listeners. + // Demand callback not serialized until after onFailure listeners. rq.demand(demand::countDown); - assertThat(demand.getCount(), is(1L)); + assertThat(demand.getCount(), is(0L)); FuturePromise callback = new FuturePromise<>(); - // Write callback serialized until after onFailure listeners. + // Write callback not serialized until after onFailure listeners. handling.get().write(false, null, Callback.from(() -> {}, callback::succeeded)); - assertFalse(callback.isDone()); + assertTrue(callback.isDone()); // Process onFailure task. try (StacklessLogging ignore = new StacklessLogging(Response.class)) diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/RequestListenersTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/RequestListenersTest.java index c93a16f520e..9ec2f2659a0 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/RequestListenersTest.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/RequestListenersTest.java @@ -194,7 +194,7 @@ public class RequestListenersTest int expectedStatus = succeedCallback ? HttpStatus.OK_200 : HttpStatus.INTERNAL_SERVER_ERROR_500; assertEquals(expectedStatus, response.getStatus()); - assertThat(failureLatch.await(1, TimeUnit.SECONDS), is(failIdleTimeout)); + assertThat(failureLatch.await(idleTimeout + 500, TimeUnit.MILLISECONDS), is(failIdleTimeout && !succeedCallback)); } @ParameterizedTest diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java index 85dc06ff5b0..19586df42bf 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java @@ -125,7 +125,7 @@ public interface Invocable * @param task the Runnable * @return a new Task */ - public static Task from(InvocationType type, Runnable task) + static Task from(InvocationType type, Runnable task) { return new ReadyTask(type, task); } @@ -202,4 +202,43 @@ public interface Invocable { return InvocationType.BLOCKING; } + + /** + * Combine {@link Runnable}s into a single {@link Runnable} that sequentially calls the others. + * @param runnables the {@link Runnable}s to combine + * @return the combined {@link Runnable} with a combined {@link InvocationType}. + */ + static Runnable combine(Runnable... runnables) + { + Runnable result = null; + for (Runnable runnable : runnables) + { + if (runnable == null) + continue; + if (result == null) + { + result = runnable; + } + else + { + Runnable first = result; + result = new Task() + { + @Override + public void run() + { + first.run(); + runnable.run(); + } + + @Override + public InvocationType getInvocationType() + { + return combine(Invocable.getInvocationType(first), Invocable.getInvocationType(runnable)); + } + }; + } + } + return result; + } } diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/InvocableTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/InvocableTest.java new file mode 100644 index 00000000000..c8d81878deb --- /dev/null +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/thread/InvocableTest.java @@ -0,0 +1,83 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.util.thread; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.junit.jupiter.api.Test; + +import static org.eclipse.jetty.util.thread.Invocable.InvocationType.BLOCKING; +import static org.eclipse.jetty.util.thread.Invocable.InvocationType.EITHER; +import static org.eclipse.jetty.util.thread.Invocable.InvocationType.NON_BLOCKING; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; + +public class InvocableTest +{ + @Test + public void testCombineType() + { + assertThat(Invocable.combine(null, null), is(BLOCKING)); + assertThat(Invocable.combine(null, BLOCKING), is(BLOCKING)); + assertThat(Invocable.combine(null, NON_BLOCKING), is(BLOCKING)); + assertThat(Invocable.combine(null, EITHER), is(BLOCKING)); + + assertThat(Invocable.combine(BLOCKING, null), is(BLOCKING)); + assertThat(Invocable.combine(BLOCKING, BLOCKING), is(BLOCKING)); + assertThat(Invocable.combine(BLOCKING, NON_BLOCKING), is(BLOCKING)); + assertThat(Invocable.combine(BLOCKING, EITHER), is(BLOCKING)); + + assertThat(Invocable.combine(NON_BLOCKING, null), is(BLOCKING)); + assertThat(Invocable.combine(NON_BLOCKING, BLOCKING), is(BLOCKING)); + assertThat(Invocable.combine(NON_BLOCKING, NON_BLOCKING), is(NON_BLOCKING)); + assertThat(Invocable.combine(NON_BLOCKING, EITHER), is(NON_BLOCKING)); + + assertThat(Invocable.combine(EITHER, null), is(BLOCKING)); + assertThat(Invocable.combine(EITHER, BLOCKING), is(BLOCKING)); + assertThat(Invocable.combine(EITHER, NON_BLOCKING), is(NON_BLOCKING)); + assertThat(Invocable.combine(EITHER, EITHER), is(EITHER)); + } + + @Test + public void testCombineRunnable() + { + Queue history = new ConcurrentLinkedQueue<>(); + + assertThat(Invocable.combine(), nullValue()); + assertThat(Invocable.combine((Runnable)null), nullValue()); + assertThat(Invocable.combine(null, (Runnable)null), nullValue()); + + Runnable r1 = () -> history.add("R1"); + Runnable r2 = () -> history.add("R2"); + Runnable r3 = () -> history.add("R3"); + + assertThat(Invocable.combine(r1, null, null), sameInstance(r1)); + assertThat(Invocable.combine(null, r2, null), sameInstance(r2)); + assertThat(Invocable.combine(null, null, r3), sameInstance(r3)); + + Runnable r13 = Invocable.combine(r1, null, r3); + history.clear(); + r13.run(); + assertThat(history, contains("R1", "R3")); + + Runnable r123 = Invocable.combine(r1, r2, r3); + history.clear(); + r123.run(); + assertThat(history, contains("R1", "R2", "R3")); + } +} diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannel.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannel.java index 45fe2cbf9b3..cc75fea6ac6 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannel.java +++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannel.java @@ -478,6 +478,13 @@ public class ServletChannel // be dispatched to an error page, so we delegate this responsibility to the ErrorHandler. reopen(); _state.errorHandling(); + + // TODO We currently directly call the errorHandler here, but this is not correct in the case of async errors, + // because since a failure has already occurred, the errorHandler is unable to write a response. + // Instead, we should fail the callback, so that it calls Response.writeError(...) with an ErrorResponse + // that ignores existing failures. However, the error handler needs to be able to call servlet pages, + // so it will need to do a new call to associate(req,res,callback) or similar, to make the servlet request and + // response wrap the error request and response. Have to think about what callback is passed. errorHandler.handle(getServletContextRequest(), getServletContextResponse(), Callback.from(_state::errorHandlingComplete)); } } diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannelState.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannelState.java index b0e6996a255..2b46cf99576 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannelState.java +++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannelState.java @@ -151,6 +151,7 @@ public class ServletChannelState private long _timeoutMs = DEFAULT_TIMEOUT; private AsyncContextEvent _event; private Thread _onTimeoutThread; + private boolean _failureListener; protected ServletChannelState(ServletChannel servletChannel) { @@ -511,6 +512,11 @@ public class ServletChannelState if (_state != State.HANDLING || (_requestState != RequestState.BLOCKING && _requestState != RequestState.ERRORING)) throw new IllegalStateException(this.getStatusStringLocked()); + if (!_failureListener) + { + _failureListener = true; + _servletChannel.getRequest().addFailureListener(this::asyncError); + } _requestState = RequestState.ASYNC; _event = event; lastAsyncListeners = _asyncListeners; @@ -1099,6 +1105,7 @@ public class ServletChannelState _asyncWritePossible = false; _timeoutMs = DEFAULT_TIMEOUT; _event = null; + _failureListener = false; } } diff --git a/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/AsyncIOServletTest.java b/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/AsyncIOServletTest.java index 89d615bff1b..9f20da747fd 100644 --- a/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/AsyncIOServletTest.java +++ b/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/AsyncIOServletTest.java @@ -92,7 +92,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -@Disabled public class AsyncIOServletTest extends AbstractTest { private static final ThreadLocal scope = new ThreadLocal<>(); @@ -1081,6 +1080,7 @@ public class AsyncIOServletTest extends AbstractTest @ParameterizedTest @MethodSource("transportsNoFCGI") + @Disabled // TODO Cannot write response from onError as failure has occurred public void testAsyncReadEarlyEOF(Transport transport) throws Exception { // SSLEngine receives the close alert from the client, and when @@ -1197,8 +1197,8 @@ public class AsyncIOServletTest extends AbstractTest } @ParameterizedTest - @MethodSource("transportsNoFCGI") - public void testAsyncEcho(Transport transport) throws Exception + @MethodSource("transports") + public void testAsyncReadEcho(Transport transport) throws Exception { // TODO: investigate why H3 does not work. Assumptions.assumeTrue(transport != Transport.H3); @@ -1208,8 +1208,6 @@ public class AsyncIOServletTest extends AbstractTest @Override protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { - System.err.println("service " + request); - AsyncContext asyncContext = request.startAsync(); ServletInputStream input = request.getInputStream(); input.setReadListener(new ReadListener() @@ -1222,7 +1220,6 @@ public class AsyncIOServletTest extends AbstractTest int b = input.read(); if (b >= 0) { - // System.err.printf("0x%2x %s %n", b, Character.isISOControl(b)?"?":(""+(char)b)); response.getOutputStream().write(b); } else diff --git a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannelState.java b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannelState.java index 163addcaafd..89f332d4ff9 100644 --- a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannelState.java +++ b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannelState.java @@ -146,6 +146,7 @@ public class HttpChannelState private long _timeoutMs = DEFAULT_TIMEOUT; private AsyncContextEvent _event; private Thread _onTimeoutThread; + private boolean _failureListener; protected HttpChannelState(HttpChannel channel) { @@ -530,6 +531,11 @@ public class HttpChannelState if (_state != State.HANDLING || _requestState != RequestState.BLOCKING) throw new IllegalStateException(this.getStatusStringLocked()); + if (!_failureListener) + { + _failureListener = true; + getHttpChannel().getCoreRequest().addFailureListener(this::asyncError); + } _requestState = RequestState.ASYNC; _event = event; lastAsyncListeners = _asyncListeners; @@ -1066,6 +1072,7 @@ public class HttpChannelState _asyncWritePossible = false; _timeoutMs = DEFAULT_TIMEOUT; _event = null; + _failureListener = false; } } diff --git a/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/AsyncIOServletTest.java b/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/AsyncIOServletTest.java index f28756d82e4..3c455e8d203 100644 --- a/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/AsyncIOServletTest.java +++ b/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/AsyncIOServletTest.java @@ -94,7 +94,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -@Disabled public class AsyncIOServletTest extends AbstractTest { private static final ThreadLocal scope = new ThreadLocal<>(); @@ -1083,6 +1082,7 @@ public class AsyncIOServletTest extends AbstractTest @ParameterizedTest @MethodSource("transportsNoFCGI") + @Disabled // TODO Cannot write response from onError as failure has occurred public void testAsyncReadEarlyEOF(Transport transport) throws Exception { // SSLEngine receives the close alert from the client, and when @@ -1200,6 +1200,7 @@ public class AsyncIOServletTest extends AbstractTest @ParameterizedTest @MethodSource("transportsNoFCGI") + @Disabled // TODO public void testAsyncIntercepted(Transport transport) throws Exception { start(transport, new HttpServlet() @@ -1550,6 +1551,7 @@ public class AsyncIOServletTest extends AbstractTest @ParameterizedTest @MethodSource("transportsNoFCGI") + @Disabled // TODO public void testAsyncInterceptedTwiceWithNulls(Transport transport) throws Exception { start(transport, new HttpServlet()